# Introduction to (Py)Spark

* Tim Hopper – [@tdhopper](http://www.twitter.com/tdhopper) – Raleigh, NC
* Developer and Anecdote Scientist
* IPython Notebook available at http://tinyurl.com/PySpark

<center>![](http://ak.picdn.net/shutterstock/videos/5081630/preview/stock-footage-electrical-spark-and-smoke-between-two-insulated-copper-wires-looped.jpg) </center>

# Introduction to (Py)Spark

> Apache Spark™ is a fast and general engine for large-scale data processing.

> It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

[source](http://spark.apache.org/docs/latest/)

# Introduction to (Py)Spark

* Originally developed at Berkeley's AMPLab in 2009.
* BSD-ed in 2010.
* Donated to Apache in 2013.
* Apache Top-Level Project in 2014.
* 1.0.0 released in May 2014.
* Currently on 1.2.0 (released December 2014).
* Backed by Databricks (databricks.com).

##Example

Sum the squares of the integers from 1 to 10.

In [1]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySparkExample") \
    .getOrCreate()

# Access the Spark Context from the Spark Session
sc = spark.sparkContext


In [2]:
sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: x**2).sum()

385

##Example

In [3]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords
stopwords = set(stopwords.words("english"))


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [4]:
try:
    import nltk.corpus as corpus
    stopwords = set(corpus.stopwords.words())
except ImportError:
    stopwords = []

In [None]:
# Import the necessary module to upload files in Colab
from google.colab import files
import re

# Upload the file
uploaded = files.upload()


In [None]:
# Assuming you have the stopwords list defined
from nltk.corpus import stopwords
import re

# Define stopwords or use an existing list like nltk's
stopwords = set(stopwords.words('english'))

# Adjust the filename based on the uploaded file
filename = next(iter(uploaded.keys()))

# Now, use your RDD transformation pipeline with Python 3 compatible syntax
rdd = sc.textFile(filename)
common_words = (rdd
                .flatMap(lambda line: re.findall(r'\w+', line.lower()))  # Split by words and lowercase
                .filter(lambda word: word not in stopwords)              # Remove stopwords
                .map(lambda word: (word, 1))                             # Map each word to (word, 1)
                .reduceByKey(lambda a, b: a + b)                         # Sum occurrences
                .map(lambda x: (x[1], x[0]))                             # Swap to (count, word) for sorting
                .top(10))                                                # Get top 10 by count

# Display the result
print("Most common words:", common_words)


## Example

These examples is running locally on my laptop.

The data file (example.txt) is loaded into a _local_ Resilient Distributed Dataset (__RDD__).

If my Spark Context (`sc`) were created on a Spark cluster, the data would have be _partitioned_ across the worker nodes.

## Example

(Py)spark evaluates expressions lazily. "The transformations are only computed when an action requires a result to be returned to the driver program." [source](http://spark.apache.org/docs/1.2.0/programming-guide.html#rdd-operations)

In [None]:
%%time
rdd = sc.parallelize(range(10**8)).map(lambda x: float(x) ** 2)


In [None]:
%%time
_ = rdd.count()

## Spark vs Pyspark?

Spark is written in Scala. The 'native' API is in Scala.

Pyspark is a very lightweight wrapper around the native API. (You can see its implementation [here](https://github.com/apache/spark/tree/master/python/pyspark).)

![](http://i.imgur.com/YlI8AqEl.png)

[source](https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals)

## Spark vs Pyspark?

__Key difference:__

* Python (unlike Scala) is dynamically typed. (RDDs can hold objects of multiple types!)
* Pyspark sometimes lags behind Spark in feature releases.

(There's also a Java API in case you really hate life.)

## Spark vs Pyspark?

It must be slower, right?

> Spark’s core developers have worked extensively to bridge the performance gap between JVM languages and Python.

> In particular, PySpark can now run on PyPy to leverage the just-in-time compiler. (Up to 50x speedup)

> The way Python processes communicate with the main Spark JVM programs have also been redesigned to enable worker reuse.

[source](http://radar.oreilly.com/2015/02/recent-performance-improvements-in-apache-spark.html)



## How is this better than Hadoop?

__Major difference:__

Spark keep data in worker memory while tends to keep data on disk.

According to the Spark webpage it can run "100x faster than Hadoop by exploiting in memory computing and other optimizations "


## How is this better than Hadoop?

> ### Spark officially sets a new record in large-scale sorting

> Using Spark on 206 EC2 machines, we sorted 100 TB of data on disk in 23 minutes. In comparison, the previous world record set by Hadoop MapReduce used 2100 machines and took 72 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines.

[source](http://databricks.com/blog/2014/11/05/spark-officially-sets-a-new-record-in-large-scale-sorting.html)

## How is this better than Hadoop?

__Also:__

__RDD__ is a key development: RDD's provide "immutable resilient  distributed collection of records".

> Unlike existing storage
abstractions for clusters, which require data replication
for fault tolerance, RDDs offer an API based on coarsegrained
transformations that lets them recover data efficiently using lineage.

See: [Resilient Distributed Datasets: A Fault-Tolerant Abstraction for
In-Memory Cluster Computing](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), [Spark: Cluster Computing with Working Sets](https://amplab.cs.berkeley.edu/wp-content/uploads/2011/06/Spark-Cluster-Computing-with-Working-Sets.pdf), [Spark Research](https://spark.apache.org/research.html)

## How is this better than Hadoop?

__Also:__
    
Spark provides 80+ high(er)-level, functional-style operators beyond simple "map" and "reduce". (Not even to mention high-level tools Spark Streaming, Spark SQL, MLib, and GraphX.)

For example:

* count
* countApprox
* flatMap
* filter
* flatMap
* groupBy
* map
* reduce
* reduceByKey
* sample
* sortBy
* union

## How is this better than Hadoop?

__Native Python Code:__

* Unlike Hive/Pig

__No Java:__

* Unlike native Hadoop

__High(er)-level operators:__

* Unlike mrjob

__Functional style:__

> Spark imitates Scala’s collections API and functional style, which is a boon to Java and Scala developers, but also somewhat familiar to developers coming from Python. [source](http://blog.cloudera.com/blog/2014/03/why-apache-spark-is-a-crossover-hit-for-data-scientists/)




[pyspark-pictures](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb) is a handy help for the Spark API:

```
rdd1.cartesian(rdd2)
```

![](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/images/pyspark-page17.svg)

## Installing (Py)Spark locally

For Mac users using [Homebrew](http://brew.sh/):

```
$ brew install apache-spark
```

Install [Java SDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html)

## Launching the Pyspark REPL


```
$ IPYTHON=1 pyspark
```

You should see:

```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc.
>>>
```

## Launching the Pyspark in an IPython notebook

```
$ IPYTHON_OPTS="notebook --matplotlib inline" pyspark
```

This creates a special IPython notebook that is initialized with a SparkContext object called `sc`:

In [None]:
sc

(You can also create [IPython profiles](http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/) [to automate some of this](http://ramhiser.com/2015/02/01/configuring-ipython-notebook-support-for-pyspark/).)

These commands will start Pyspark in __local__ mode. As opposed to __cluster__ mode.

The exact same code can be run in local and cluster modes! It just depends on how you initialize your Spark session.

#Getting Data

In [None]:
# Load a Python iterable into an RDD

sc.parallelize(range(10))

In [None]:
# Load a text file

sc.textFile("example.txt") # Each line is a separate element in the RDD

In [None]:
# Load text files

sc.textFile("example.txt,example2.txt").collect()[-1001:-991]

These can be used to load text files from Amazon S3.

`SparkContext.wholeTextFile`...

> ...lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code>textFile</code>, which would return one record per line in each file


`SparkContext.newAPIHadoopRDD`

> PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs.

For example, [Cassandra](https://github.com/apache/spark/blob/master/examples/src/main/python/cassandra_inputformat.py).

#Saving Data

`rdd.collect()` converts a RDD object into a Python list on the host machine.

`rdd.saveAsTextFile()` saves an RDD as a string. See also `rdd.saveAsPickleFile()`.

`rdd.saveAsNewAPIHadoopDataset()` saves an RDD object to a Hadoop data source (e.g. HDFS, [Cassandra](https://github.com/Parsely/pyspark-cassandra)).

#Manipulating Data

Sort last three presidents by last name

In [None]:
rdd = sc.parallelize(["Barack Hussein Obama", "George Walker Bush", "William Jefferson Clinton"])

rdd.sortBy(keyfunc=lambda k: k.split(" ")[-1]).collect()

#Manipulating Data

Join Datasets

In [None]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 6), ("b", 7), ("b", 8), ("d", 9)])

In [None]:
rdd1.join(rdd2).collect()

In [None]:
rdd1.fullOuterJoin(rdd2).collect()

#Manipulating Data

In [None]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StructField, StructType, FloatType
import pandas as pd


Let's load in the [Wake County Real Estate Data](http://www.wakegov.com/tax/realestate/redatafile/pages/default.aspx).

In [None]:
raw_real_estate = sc.textFile("all.txt")
print(raw_real_estate.take(1)[0][:250])


<img src="http://i.imgur.com/EiPTMML.gif" title="source: imgur.com" width="250"/>

#Manipulating Data

In [None]:
from pyspark.sql import SparkSession, Row
import pandas as pd

# Initialize SparkSession (replaces SQLContext)
spark = SparkSession.builder.appName("RealEstateApp").getOrCreate()




In [None]:
# Check if data is loaded correctly
print("Initial data load sample:")
print(raw_real_estate.take(5))  # Print a sample to verify loading

# Map raw data into a dictionary format for each row
wake_county_real_estate = raw_real_estate.map(lambda row:
    dict(
     owner = row[0:35].strip().title(),
     last_name = row[0:35].strip().title().split(",")[0],
     address = row[70:105].strip().title(),
     sale_price = int(row[273:284].strip() or -1),
     value = int(row[305:316].strip() or -1),
     use = int(row[653:656].strip() or -1),
     heated_area = int(row[471:482].strip() or -1),
     year_built = int(row[455:459].strip() or -1),
     height = row[509:510].strip(),
    ))

# Check the mapped data to verify field extraction
print("Mapped data sample:")
print(wake_county_real_estate.take(5))

# Convert the mapped data to an RDD of Rows
wake_county_real_estate_rdd = wake_county_real_estate.map(lambda d: Row(**d))

# Create a DataFrame from the RDD
wake_df = spark.createDataFrame(wake_county_real_estate_rdd)

# Register the DataFrame as a SQL temporary view
wake_df.createOrReplaceTempView("wake")

# Step 1: Check if data exists in the DataFrame by querying all rows without filters
print("Sample data from 'wake' table (no filters):")
wake_df.show(10)

# Step 2: Apply each filter step-by-step to identify which might be causing issues

# First, filter by `use = 66` (for church buildings)
print("Sample data where use = 66:")
result_df_use = spark.sql("SELECT * FROM wake WHERE use = 66 LIMIT 10")
result_df_use.show()

# Then add `value > 4000000`
print("Sample data where use = 66 and value > 4000000:")
result_df_value = spark.sql("SELECT * FROM wake WHERE use = 66 AND value > 4000000 LIMIT 10")
result_df_value.show()

# Finally, add the filter `owner LIKE '%Church%'`
print("Final query with all filters:")
result_df_final = spark.sql("""
    SELECT DISTINCT owner, address, year_built, value
    FROM wake
    WHERE value > 4000000 AND
          use = 66 AND
          owner LIKE '%Church%'
""")
result_df_final.show()

# If needed, convert to pandas for further analysis
pd_df = result_df_final.toPandas()
print(pd_df)

#Manipulating Data

Who owns the most expensive church buildings in Raleigh?

In [None]:
# Check the unique values in the 'use' field to confirm expected values
spark.sql("SELECT DISTINCT use FROM wake").show()

# Check sample values in the 'value' field to confirm it contains numeric data
spark.sql("SELECT value FROM wake ORDER BY value DESC LIMIT 10").show()

# Check owners to ensure they contain the keyword 'Church' where expected
spark.sql("SELECT owner FROM wake WHERE owner LIKE '%Church%' LIMIT 10").show()


#Manipulating Data

What is the 43rd richest American's house worth?

In [None]:
# Run the SQL query to find the maximum value for the specified owner
result = spark.sql("""
    SELECT MAX(value) AS price
    FROM wake
    WHERE owner LIKE 'Goodnight, James H% & Ann B%'
    GROUP BY last_name
""")

# Collect the result and check if it's non-empty
price_data = result.collect()

# Safely access the price if a result exists
if price_data:
    max_price = price_data[0]['price']
    print("Max Price:", max_price)
else:
    print("No results found for the specified owner.")


(We could have done these same queries with the 'native' Spark functional method chaining.)

In [None]:
# Execute the SQL query to get the maximum value for the specified church
result = spark.sql("""
    SELECT MAX(value) AS price
    FROM wake
    WHERE owner = 'Crosspointe Church At Cary'
""")

# Collect the result and check if it's non-empty
price_data = result.collect()

# Access the price if a result exists
if price_data:
    max_price = price_data[0]['price']
    print("Max Price for Crosspointe Church At Cary:", max_price)
else:
    print("No results found for the specified church.")


#Manipulating Data

Again, if you wanted to load terabytes of real estate data from HDFS or S3 (for example), you could run this exact same code on a Spark cluster.

<img src="http://41.media.tumblr.com/7b2b5be9520a1f7a5deebc49a78bc5ce/tumblr_n6ck5rrrBy1td9006o1_1280.jpg" width=500>

##Data Frames! (Coming Soon in 1.3)

Constructs a DataFrame from the users table in Hive.

```python
users = sc.table("users")
```

Create a new DataFrame that contains “young users” only

```python
young = users.filter(users.age < 21)
```
    
Count the number of young users by gender
    
```python
young.groupBy("gender").count()
```

##Data Frames! (Coming Soon in 1.3)

From JSON files in S3

```python
logs = sc.load("s3n://path/to/data.json", "json")
```

Join young users with another DataFrame called logs
    
```python
young.join(logs, logs.userId == users.userId, "left_outer")
```


[source](https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html)

##Data Frames! (Coming Soon in 1.3)

<center><img src="http://blogs.edweek.org/edweek/the_startup_blog/assets_c/2014/02/a78ae76da19c1a0f9e0e9b2f7e6229e70bd36cf7bc5b2f29b5f8900face50234%5B1%5D-thumb-autox384-6965.jpg" width="200"></center>

Convert Spark DataFrame to Pandas

```
pandas_df = young.toPandas()
```

Create a Spark DataFrame from Pandas

```
spark_df = context.createDataFrame(pandas_df)
```

#Machine Learning with (Py)Spark

In [None]:
from pyspark.mllib.tree import DecisionTree, LabeledPoint
from pyspark.mllib import feature
from pyspark.mllib.stat import Statistics
from random import choice

Subset to Apartment Buildings and Office Buildings

In [None]:
subset = wake_county_real_estate.filter(lambda d:
                                        d["use"] in [7, 34])
subset = subset.filter(lambda d: d["heated_area"] > 0
              and d["year_built"] > 1900) \
              .map(lambda d: LabeledPoint(
                                1 if d["use"] == 7 else 0,
                                [d["year_built"],
                                 d["heated_area"]]))

subset.take(2)

In [None]:
print("ABC, 123123123")

In [None]:
 print("AHMED M. S. ALBREEM, 210041258")
