# Quick Start

From [Quick Start](https://spark.apache.org/docs/latest/quick-start.html)

In [10]:
# Print environment information

import sys
import os

print(f"Python version:\n\t{sys.version}")
print("\n\n")
print(f"Environment:\n\t{repr(os.environ)}")

Python version:
	3.7.0 (default, Jul 10 2018, 07:48:31) 
[Clang 9.1.0 (clang-902.0.39.2)]



Environment:
	environ({'SPARK_HOME': '/usr/local/Cellar/apache-spark/2.3.2/libexec', 'TERM_PROGRAM': 'iTerm.app', 'rvm_bin_path': '/Users/dra/.rvm/bin', 'GEM_HOME': '/Users/dra/.rvm/gems/ruby-2.5.1@ios', 'NVM_CD_FLAGS': '', 'SHELL': '/usr/local/bin/fish', 'TERM': 'xterm-color', 'OMF_CONFIG': '/Users/dra/.config/omf', 'TMPDIR': '/var/folders/60/9grv09hj6c39tv4rlmyjc4pm94m_nc/T/', 'Apple_PubSub_Socket_Render': '/private/tmp/com.apple.launchd.uy4QaOFFql/Render', 'TERM_PROGRAM_VERSION': '3.2.5', 'TERM_SESSION_ID': 'w0t0p0:7C79ECDD-608F-4371-85BB-54AD8841C156', 'SPARK_CONF_DIR': '/usr/local/Cellar/apache-spark/2.3.2/libexec/conf', 'NVM_DIR': '/Users/dra/.nvm', 'USER': 'dra', 'COMMAND_MODE': 'unix2003', 'PYSPARK_PYTHON': 'python', 'rvm_path': '/Users/dra/.rvm', 'PYSPARK_DRIVER_PYTHON': 'jupyter', 'SSH_AUTH_SOCK': '/private/tmp/com.apple.launchd.nLcGXejecb/Listeners', '__CF_USER_TEXT_ENCODING': '0x124

In [11]:
# Print the spark context.
#
# pyspark will create a SparkContext. You can configure the SparkContext by passing command line args to `pyspark`
#
# Adds test.py to the path - which can be imported later.
#
# $ pyspark --py-files test.py
#
# You can customize the Jupyter commands using the PYSPARK_DRIVER_PYTHON_OPTS environment variables.
# 
# These variables are set at: ~/.config/fish/config.fish
# set --export PYSPARK_DRIVER_PYTHON jupyter
# set --export PYSPARK_DRIVER_PYTHON_OPTS 'notebook'

spark

In [12]:
# 
# DataFrame
#
# A DataFrame is a Dataset (Scala, Java API) organized into named columns.
# it is conceptually equivalent to a table in a relational database or a data frame in R / Python.
# 
# Spark supports creating DataFrame(s) using multiple input sources:
# 
# * Text files (text, json, parquet, csv)
# * JDBC
# * Hive
#
#
# Creates a DataFrame by reading in a textfile of lines.
# 
# NOTE: For local files, the file must also be accessible at the same path on worker nodes. Either
#       use HDFS or a network file share when operating in a cluster.
#
namesDF = spark.read.text("data/names.txt")



In [13]:
#
# Exploring a DataFrame
#

namesDF.printSchema()

print(f"count == {namesDF.count()}")

# Examining data
namesDF.select("value").show()
namesDF.select("value").take(10)


root
 |-- value: string (nullable = true)

count == 100000
+---------------+
|          value|
+---------------+
|   Sharon Payne|
| Clayton Harper|
| Joanne Johnson|
|   Kelly Wilson|
| Taylor Sanford|
|   Sarah Miller|
|     Sean Barry|
|     Adam Smith|
|   George Mccoy|
|Jeffrey Gregory|
|   Charles Hall|
|  Pamela Wright|
|    Bryan Scott|
|  William Wells|
|    Stacy Moore|
|  Jeremy Barber|
| Joshua Murillo|
|Dr. Kendra Hall|
|  Angela Warren|
|  Marcus Brooks|
+---------------+
only showing top 20 rows



[Row(value='Sharon Payne'),
 Row(value='Clayton Harper'),
 Row(value='Joanne Johnson'),
 Row(value='Kelly Wilson'),
 Row(value='Taylor Sanford'),
 Row(value='Sarah Miller'),
 Row(value='Sean Barry'),
 Row(value='Adam Smith'),
 Row(value='George Mccoy'),
 Row(value='Jeffrey Gregory')]

In [16]:
#
# DataFrames support two types of operations:
#
# * Transformations (map, filter) - return new DataFrames.
# * Actions (reduce) - collects results.
#
# Transformations are lazy - they are not executed until an action is performed.
#
# Create a new Dataset by filtering the existing one.
#
from pyspark.sql.functions import *

# 
# Create a new DataFrame by filtering the original. Note this DF is not 
# materialized until the .count() action occurs.
# 
# Regex filter for `mike`, effectively case insensitive.
#

mikeDF = namesDF.filter(lower(namesDF["value"]).rlike("mike"))
print(f"Mike count is: {mikeDF.count()}")


Mike count is: 68


In [17]:
# Find the line with the most words
namesDF.select(size(split(namesDF.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()

[Row(max(numWords)=4)]

In [18]:
# MapReduce Example
# 
# Use the explode function in select to transform a Dataset of lines into a Dataset of words, and then combine
# them with groupBy and count to compute the per-word countes in the file as a Dataset of 2 columns - "word" and "count"

wordCountsDF = namesDF.select(explode(split(namesDF.value, "\s+")).alias("word")).groupBy("word").count()
wordCountsDF.printSchema()
wordCountsDF.sort(col("count"), ascending=False).take(10)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)



[Row(word='Michael', count=2270),
 Row(word='Smith', count=2133),
 Row(word='James', count=1705),
 Row(word='Johnson', count=1639),
 Row(word='David', count=1584),
 Row(word='John', count=1443),
 Row(word='Jennifer', count=1428),
 Row(word='Thomas', count=1422),
 Row(word='Williams', count=1403),
 Row(word='Christopher', count=1395)]

In [19]:
# Cache the dataset into a cluster-wide in-memory cache.
# 
# This is useful when data is accessed multiple times.

wordCountsDf.cache()


NameError: name 'wordCountsDf' is not defined

In [101]:
#
# Perform SQL operations on a DataFrame, returning results as a new DataFrame.
# 
# Note: SQL operations can only be executed on DataFrames which have been registered
# as a table.
# 

# Register the DataFrame as a temporary view.
#
# Temporary views are session-scoped. To have a temporary view that is shared among
# all sessions and kept alive until Spark terminates, use `createOrReplaceGlobalTempView`.
# 
namesDF.createOrReplaceTempView("names")

sqlDF = spark.sql("SELECT * FROM names")

sqlDF.show()


+---------------+
|          value|
+---------------+
|   Sharon Payne|
| Clayton Harper|
| Joanne Johnson|
|   Kelly Wilson|
| Taylor Sanford|
|   Sarah Miller|
|     Sean Barry|
|     Adam Smith|
|   George Mccoy|
|Jeffrey Gregory|
|   Charles Hall|
|  Pamela Wright|
|    Bryan Scott|
|  William Wells|
|    Stacy Moore|
|  Jeremy Barber|
| Joshua Murillo|
|Dr. Kendra Hall|
|  Angela Warren|
|  Marcus Brooks|
+---------------+
only showing top 20 rows

