# Apache Spark

**Analyze large datasets on clusters using Apache Spark**

This notebook is intended to run in Google CoLab.
Skip the next code block if ran locally.

## Colab Setup

In [None]:
# Find the latest version of spark from http://www-us.apache.org/dist/spark/ 
spark_version = 'spark-3.0.1'

In [None]:
# Set Environment Variables
import os
os.environ['SPARK_VERSION'] = spark_version
os.environ['BASE_URL'] = 'http://www-us.apache.org/dist/spark'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Install spark, java, and findspark
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q $BASE_URL/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Initialize spark session
import findspark
findspark.init()

Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


## Introduction to Spark

### Start session

Initialize a SparkSession from pyspark

In [None]:
from pyspark.sql import SparkSession


# Start spark session
spark = SparkSession\
    .builder\
    .appName("DataFrameBasics")\
    .getOrCreate()

### Create a DataFrame

Using an array of tuples and two headers, create an example dataframe

In [None]:
# Example dataframe
df = spark.createDataFrame(
    [(0, 'A'),
     (1, 'B'), 
     (2, 'C'),], 
    ['id', 'words'])

# Show head of dataframe
df.show()

+---+-----+
| id|words|
+---+-----+
|  0|    A|
|  1|    B|
|  2|    C|
+---+-----+



### SparkFiles
Connect to Amazon's S3 using sparkfiles

#### Food data

In [None]:
from pyspark import SparkFiles


# Url for data in s3
# url = 'https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv'
url = 'https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv'

# Add file to session
spark.sparkContext.addFile(url)

# create dataframe from csv
dataframe = spark.read.csv(
    SparkFiles.get('food.csv'), 
    header=True)

In [None]:
# Show dataframe
dataframe.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [None]:
# Print schema
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



In [None]:
# Show columns
dataframe.columns

['food', 'price']

In [None]:
# Describe data
dataframe.describe()

DataFrame[summary: string, food: string, price: string]

##### Transform data
Change price data type to integer

##### Change schema

In [None]:
from pyspark.sql.types import StructField as Field
from pyspark.sql.types import StringType, IntegerType, StructType


# Create a list of structure fields
schema = StructType(fields=[
    Field("food", StringType(), True), 
    Field("price", IntegerType(), True),
])

# Load dataframe with correct schema
dataframe = spark.read.csv(
    SparkFiles.get('food.csv'),
    schema=schema,
    header=True)

# Print schema
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



### Some other data frame methods

Manipulate columns using the ```.withColumn*``` methods

In [None]:
# New price column
dataframe.withColumn(
    'newprice', dataframe['price']
).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+



In [None]:
# Make column name uppercase
dataframe.withColumnRenamed('price','Price').show()

+-------+-----+
|   food|Price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [None]:
# Use arithmetic function to transform the data
dataframe.withColumn(
    'DoublePrice', dataframe['Price'] * 2
).show()

+-------+-----+-----------+
|   food|price|DoublePrice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+

