# Learning Spark Basics - Demos
### Yu Feng
---

### Environment Variables Setting

In [1]:
import os
import sys

def configure_spark(spark_home=None, pyspark_python=None):
    spark_home = spark_home or "/usr/local/spark"
    os.environ['SPARK_HOME'] = spark_home
    
    # Add the PySpark directories to the Python path:
    sys.path.insert(1, os.path.join(spark_home, 'python'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'build'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'lib','pyspark.zip'))
    sys.path.insert(1, os.path.join(spark_home, 'python', 'lib','py4j-0.10.1-src.zip'))
    
    os.environ['PYTHONPATH'] = os.path.join(spark_home, 'python', 'lib','py4j-0.10.1-src.zip')
    
    # If PySpark isn't specified, use currently running Python binary:
    pyspark_python = pyspark_python or sys.executable
    os.environ['PYSPARK_PYTHON'] = pyspark_python

configure_spark()

In [2]:
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-1.7.0-openjdk-amd64"

# Demo 1: Programming with RDDs

The most traditional tasks for Spark

In [3]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App") 
sc = SparkContext(conf = conf)

## RDD (resilient distributed dataset)
* Spilt into multiple partitions
* Be computed on different nodes of the cluster

In [4]:
# Create an RDD from external dataset
lines = sc.textFile("alice30.txt") 

In [5]:
# Create an RDD from an existing colleciton
localLines = sc.parallelize(["pandas", "i like pandas"]) 

## RRD Operations: Actions and Transformation

### - Actions: compute results based on RDD, return it to driver program or save it to external storage system (e.g. HDFS)

* .count() - Count the number of items in this RDD
* .first() - First item in this RDD
* .take() - Return first n lines 
* .collect() - Return all lines, should not used in large datasets
* .reduce()

In [6]:
print lines.count()
print lines.first()

3599
                ALICE'S ADVENTURES IN WONDERLAND


In [7]:
for line in lines.take(10): 
    print line

                ALICE'S ADVENTURES IN WONDERLAND

                          Lewis Carroll

               THE MILLENNIUM FULCRUM EDITION 3.0




                            CHAPTER I


In [8]:
nums = sc.parallelize([1, 2, 3, 4]) 
sum = nums.reduce(lambda x, y: x + y)
print sum 

10


### - Transformation: construct a new RDD from a previous one

* .filter() - takes a funtion and return the elements fulfill the requirements
* .map() - takes a function and return new values for each elements
* .distinct(), .union(), .intersection(), .substract() - mathematical sets

![Image1](images/mathsetop.PNG)

In [9]:
# Return an entire new RDD
pythonLines = lines.filter(lambda line: "Queen" in line)

#for line in pythonLines.collect():
#    print line

print pythonLines.count()

74


In [10]:
kingLines = lines.filter(lambda line: "King" in line)
king_queen_lines = pythonLines.union(kingLines)

print king_queen_lines.count()

136


In [11]:
def containsKing(s):
    return "King" in s

kingLines = lines.filter(containsKing)
print kingLines.count()

62


In [12]:
nums = sc.parallelize([1, 2, 3, 4]) 
squared = nums.map(lambda x: x * x).collect() 
print squared

[1, 4, 9, 16]


In [13]:
sc.stop() 
del sc

# Demo 2: Programming with DataFrame

Dataframe refers to "tabular" data



## DataFrame in Spark
DataFrame performs much faster than RDDs


![Image2](images/dataframes-faster.png)

In [14]:
# Version 2.0.0, use SparkSession to create DataFrame, register DataFrame 
# as tables, execute SQL over tables, cache tables, and read parquet files. 

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [15]:
# spark is an existing SparkSession
df = spark.read.csv("work-flow/tweet.csv", header=True)
# Displays the content of the DataFrame to stdout
print 'Total: %d' % df.count()
df.show(10)

Total: 3118
+---------+-----------+--------------------+-----+--------------------+
| Latitude|  Longitude|                Time|Label|            Location|
+---------+-----------+--------------------+-----+--------------------+
|55.864237|  -4.251806|Mon Oct 05 08:20:...|    1|   Glasgow  Scotland|
|33.748995| -84.387982|Mon Oct 05 08:20:...|    1|         Atlanta  GA|
|33.448377|-112.074037|Mon Oct 05 08:20:...|    1|     Phoenix Arizona|
|34.000710| -81.034814|Mon Oct 05 08:20:...|    1|Columbia  South C...|
|40.569789| -79.764770|Mon Oct 05 08:20:...|    1|  New Kensington  PA|
|33.748995| -84.387982|Mon Oct 05 08:20:...|    1|         Atlanta  GA|
|35.689197|  51.388974|Mon Oct 05 08:20:...|    1|         IRAN TEHRAN|
|35.125801|-117.985904|Mon Oct 05 08:20:...|    1|  California City CA|
|51.621440|  -3.943646|Mon Oct 05 08:20:...|    1|             swansea|
|33.748995| -84.387982|Mon Oct 05 08:20:...|    1|   Atlanta  GA      |
+---------+-----------+--------------------+-----+--

In [16]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- Location: string (nullable = true)



In [17]:
# Select only the "Location" column
df.select('Location').show()

+--------------------+
|            Location|
+--------------------+
|   Glasgow  Scotland|
|         Atlanta  GA|
|     Phoenix Arizona|
|Columbia  South C...|
|  New Kensington  PA|
|         Atlanta  GA|
|         IRAN TEHRAN|
|  California City CA|
|             swansea|
|   Atlanta  GA      |
|Vila   Velha   ES...|
|             USA  ma|
|         IRAN TEHRAN|
|         Phoenix  AZ|
|              Lagos |
| Cavite  Philippines|
|      Barrington  RI|
|               Miami|
|                U.K.|
|      Patiala Punjab|
+--------------------+
only showing top 20 rows



In [18]:
# Count people by Location
df.groupBy('Location').count().sort('count', ascending=False).show()

+--------------------+-----+
|            Location|count|
+--------------------+-----+
|              London|   86|
|        Columbia  SC|   34|
|Querétaro Arteaga...|   26|
|Florence  South C...|   25|
|         Atlanta  GA|   24|
|    Myrtle Beach  SC|   24|
|        New York  NY|   24|
|       Charlotte  NC|   22|
|      Wilmington  NC|   22|
|         Norfolk  VA|   20|
|      Charleston  SC|   18|
|      South Carolina|   18|
|          Birmingham|   17|
|               World|   17|
|     London  England|   17|
|         Houston  TX|   16|
|        Greenbelt MD|   15|
| Manchester  England|   15|
|               Earth|   15|
|      Lagos  Nigeria|   14|
+--------------------+-----+
only showing top 20 rows



In [19]:
# Select by location
df.filter(df['Location'] == 'Manchester  England').show()

# Pandas-like syntax
df[df['Location'] == 'Manchester  England'].show()

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("spatialdata")
sqlDF = spark.sql("SELECT * FROM spatialdata WHERE Location == 'Manchester  England' ")
sqlDF.show()

+---------+----------+--------------------+-----+-------------------+
| Latitude| Longitude|                Time|Label|           Location|
+---------+----------+--------------------+-----+-------------------+
|53.480759| -2.242631|Mon Oct 05 08:21:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:22:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:22:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:23:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:24:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:28:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:30:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:32:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:33:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:33:...|    1|Manchester  England|
|53.480759| -2.242631|Mon Oct 05 08:34:...|    1|Manchester  England|
|53.480759| -2.24263

In [20]:
# Convert Spark DataFrame to Pandas
pandas_df = df.toPandas()
print 'Pandas Dataframe: '
print pandas_df.dtypes

# Create a Spark DataFrame from Pandas
spark_df = spark.createDataFrame(pandas_df)
print 'Spark Dataframe: '
print spark_df

Pandas Dataframe: 
Latitude     object
Longitude    object
Time         object
Label        object
Location     object
dtype: object
Spark Dataframe: 
DataFrame[Latitude: string, Longitude: string, Time: string, Label: string, Location: string]


In [21]:
spark.stop()
del spark

# Demo 3: Geospatial Analysis with Apache Spark

Spatial funcitonalities do not supported by original Apache Spark distributions

![](images/spark_packagess.png)

## SpatialSpark (88 🌟 in Github)

SpatialSpark aims to provide efficient spatial operations using Apache Spark. It can be used as a Spark library for spatial extension as well as a standalone application to process large scale spatial join operations.

SpatialSpark has been compiled and tested on Spark 2.0.2. For geometry operations and data structures for indexes, well known JTS library is used.

Reference: 
1. http://simin.me/projects/spatialspark/
2. https://github.com/syoummer/SpatialSpark


## GeoSpark (134 🌟 in Github)

GeoSpark is a cluster computing system for processing large-scale spatial data. GeoSpark extends Apache Spark with a set of out-of-the-box **Spatial Resilient Distributed Datasets (SRDDs)** that efficiently load, process, and analyze large-scale spatial data across machines. GeoSpark provides APIs for Apache Spark programmer to easily develop their spatial analysis programs with Spatial Resilient Distributed Datasets (SRDDs) which have in house support for **geometrical and Spatial Queries (Range, K Nearest Neighbors, Join)**.

Reference:
1. http://geospark.datasyslab.org/
2. https://github.com/DataSystemsLab/GeoSpark


## Magellan (249 🌟 in Github)

Magellan is an open source library Geospatial Analytics using Spark as the underlying engine. We leverage Catalyst’s pluggable optimizer to efficiently execute spatial joins, SparkSQL’s powerful operators to express geometric queries in a natural DSL, and Pyspark’s Python integration to **provide Python bindings**.

Currently supported files:
- **ESRI format files** 
- **GeoJSON**.

Currently supported capabilities:
- **Geometries**: Point, LineString, Polygon, MultiPoint
- **Predicates**: Intersects, Within, Contains
- **Operations**: Intersection

Scala and Python API

Reference:
1. https://de.hortonworks.com/blog/magellan-geospatial-analytics-in-spark/
2. https://github.com/harsha2010/magellan

In [22]:
from pyspark import SparkConf, SparkContext, SQLContext

conf = SparkConf().setMaster("local").setAppName("My App") 
sc = SparkContext(conf = conf)


In [24]:
#from magellan.types import Point, Polygon
from pyspark.sql import Row, SQLContext

magellan = sc._jvm.magellan

print magellan.types.Point(-1.0, -1.0)

def namestr(obj, namespace):
    return [name for name in namespace if namespace[name] is obj]
namestr(magellan, globals())

#PointRecord = Row("id", "point")

#points = sc.parallelize([
#  (0, Point(-1.0, -1.0)),
#  (1, Point(-1.0, 1.0)),
#  (2, Point(1.0, -1.0))])\
#.map(lambda x: PointRecord(*x))\
#.toDF()

#points.show()

TypeError: 'JavaPackage' object is not callable

In [25]:
sc.stop()
del sc