# Getting Started With Spark using Python

<center>
    <img src="http://spark.apache.org/images/spark-logo.png" width="300"/>
</center>

Spark is an open source in-memory application framework for distributed data processing (cluster computing environment) and iterative analysis on big data. 

It's a multi-language (Scala, python, Java, R, SQL) engine for executing data engineering, data science, and machine learning on single-node machines or clusters. 

Spark is written in Scala, which compiles to Java bytecode, but you can write python code to communicate to the java virtual machine through a library called py4j. Python has the richest API (pyspark), but it can be somewhat limiting if you need to use a method that is not available or because of the slowness runing caused by latency associated with communicating back and forth to the JVM. An exception to this is the SparkSQL library, which has an execution planning engine that precompiles the queries.

General recommandations :
- If you need to write high-performance or specialized code, try doing it in scala.  
- Use the "out of the box" methods available as much as possible and avoid overly frequent (iterative) calls to Spark methods. 

Apache Spark solves the problems encountered with MapReduce (The processing component of Hadoop) by keeping a substantial portion of the data required in-memory, avoiding expensive and time-consuming disk I/O.

Learn more <a href="https://spark.apache.org/">here</a> about spark.

For Standalone use, please see <a href="https://phoenixnap.com/kb/install-spark-on-ubuntu">this link</a> to install spark easily on Ubunto.

## Objectives

In this notebook, we will go over the basics of Apache Spark and PySpark. We will start with giving some concept definitions, creating the SparkContext and SparkSession. We then create an RDD and apply some basic transformations and actions. Finally we'll present methods for creating Spark DataFrame & demonstrate the basics dataframes and SparkSQL and how to close the spark context ans session connections.

by the time you finish reading, you will be able to:

- Define RDD, DataFrame, Dataset, sparkContext, sparkSession
- Create the SparkContext and SparkSession
- Create an RDD and apply some basic transformations and actions on RDDs
- Demonstrate the use of the basics Dataframes and SparkSQL
- Methods for creating Spark DataFrame
- Close spark context and session connections

## Definitions:

**RDD :** It stands for Resilient Distributed Datasets. It's Spark's primary data abstraction. It's a collection of fault tolerant `the capability to operate and to recover loss after a failure occurs beause of redundancy` elements partitioned across the cluster's nodes capable of receiving parallel operations. It's immutable,meaning that these databases cannot be changed once created.
RDD's support text, sequence files, Avro, Parquet and Hadoop input format file types. RDD's also support local, Cassandra, H Base, HDFS, Amazon S3, and other file formats in addition to an abundance of relational and no SQL databases. RDD's are created :
- Using an external or local file from a Hadoop supported file system such as HDFS, Cassandra, H Base or Amazon S3.
- Applying the parallelize function to an existing collection in the driver program.
- Applying a transformation on an existing RDD to create a new RDD

RDD's provide resilience in Spark through immutability and caching. First RDDs are always recoverable as the data is immutable. Another essential Spark capability is the persisting or caching of a data set in memory across operations. The cache is fault tolerant and always recoverable. 

**SparkSQL** is a Spark module for structured data processing. You can interact with SparkSQL using SQL queries and the DataFrame API. Spark SQL uses the same execution engine to compute the result independently of which API or language you are using for the computation. 

**DataFrame :** is a collection of data organized into named columns. DataFrames are conceptually equivalent to a table in a relational database and similar to a data frame in R/Python, but with richer optimizations.
DataFrames are built on top of the Spark RDD.
DataFrames are highly scalable from a few kilobytes on a single laptop to petabytes (`10^6 GB`) on a large cluster of machines. DataFrames provide optimization and code generation through a Catalyst optimizer. 

**Dataset :** Datasets are the newest Spark data abstraction. Like RDDs and DataFrames, datasets provide an API to access a distributed data collection. Datasets are a collection of strongly typed Java Virtual Machine objects. Strongly typed means that datasets are typesafe, and the dataset’s datatype is made explicit
during its creation. Datasets provide the benefits of both RDDs, such as lambda functions, type-safety, and SQL Optimizations from SparkSQL. Because datasets are strongly typed, APIs are currently only available in Scala and Java, which are statically typed languages. `Dynamically typed languages, such as Python and R, do not support dataset APIs`. 

**SparkContext :** is an entry point to Spark programming with RDD and to connect to Spark Cluster. It's used to programmatically create Spark RDD, accumulators and broadcast variables on the cluster. Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object `sc` is default available in spark-shell and it can be programmatically created using SparkContext class. 

**SparkSession :** Since Spark 2.0 SparkSession has been introduced and became an entry point to start programming with DataFrame and Dataset. It’s object `spark` is default available in spark-shell and it can be created programmatically using SparkSession builder pattern. 

## Setup

In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [2]:
#add pyspark to sys.path and initialize
import findspark
findspark.init()

Check <a href="https://github.com/minrk/findspark">this link</a> to understand what findspark.init() do exacteley.

## Creating Spark Context and Spark Session

In [3]:
# Load the dataframe API session into Spark and create a session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
# Import and create a SparkContext
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("projectName").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

## RDD's and basic transformations & action

**Create an RDD.**

For demonstration purposes, we create an RDD here by calling sc.parallelize()
We create an RDD which has integers from 1 to 30.

In [5]:
data = range(1,31)
# print first element of iterator
print(data[0])

#Create the RDD (a distribueted dataset with 4 partitions)
xrangeRDD = sc.parallelize(data, 4)

# this will let us know that we created an RDD
xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

**Transformations**

A transformation is an operation on an RDD that results in a new RDD. The transformed RDD is generated rapidly because the new RDD is lazily evaluated, which means that the calculation is not carried out when the new RDD is generated. The RDD will contain a series of transformations, or computation instructions, that will only be carried out when an action is called. In this transformation, we reduce each element in the RDD by 1. Note the use of the lambda function. We also then filter the RDD to only contain elements <10

In [6]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

**Actions**

A transformation returns a result to the driver. We now apply the collect() action to get the output from the transformation.

In [7]:
print(filteredRDD.collect())
filteredRDD.count()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

**Caching Data**

This simple example shows how to create an RDD and cache it. Notice the 10x speed improvement! If you wish to see the actual computation time, browse to the Spark UI...it's at host:4040 or see the link bellow. You'll see that the second calculation took much less time!

In [8]:
spark

In [9]:
import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
# first count will trigger evaluation of count *and* then cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

dt1:  1.0804994106292725
dt2:  0.2651362419128418


In [10]:
test.count()

49999

Let's now create an RDD with integers from 1-50. Apply a transformation to multiply every number by 2, resulting in an RDD that contains the first 50 even numbers.

In [11]:
numbers = range(1, 51)
numbers_RDD = sc.parallelize(numbers, 4)
even_numbers_RDD = numbers_RDD.map(lambda x: x*2)

In [12]:
even_numbers_RDD.take(5)

[2, 4, 6, 8, 10]

## DataFrames and SparkSQL

In order to work with the extremely powerful SQL engine in Apache Spark, you will need a **Spark Session**. We have created an instance above, let's verify that spark session is still active. Feel free to click on the "Spark UI" button to explore the Spark UI elements.

In [13]:
spark

**Create Your First DataFrame**

You can create a structured data set (much like a database table) in Spark. Once you have done that, you can then use powerful SQL tools to query and join your dataframes.

In [14]:
# Download the data first into a local `people.json` file
!curl https://raw.githubusercontent.com/hassanrhanimi/Start_with_spark/main/people.json > people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    73  100    73    0     0    366      0 --:--:-- --:--:-- --:--:--   366


In [15]:
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("people.json").cache()

In [16]:
# Print the dataframe as well as the data schema
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [17]:
# To be able to use sql, let's first register the DataFrame as a SQL temporary view
df.createTempView("people")

**Explore the data using DataFrame functions and SparkSQL**

In this section, we explore the datasets using functions both from dataframes as well as corresponding SQL queries using sparksql. Note the different ways to achieve the same task!

In [18]:
# Select and show basic data columns
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [19]:
# Perform basic filtering
df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [20]:
# Perfom basic aggregation of data
df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|  30|    1|
|null|    1|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|  30|    1|
|null|    0|
+----+-----+



Similar to the people.json file, now read the people2.json file into the notebook, load it into a dataframe and apply SQL operations to determine the average age in our people2 file.

In [21]:
!curl https://raw.githubusercontent.com/hassanrhanimi/Start_with_spark/main/people2.json > people2.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   136  100   136    0     0   1416      0 --:--:-- --:--:-- --:--:--  1416


In [22]:
df = spark.read.json('people2.json')
# As before, let's first register the DataFrame as a SQL temporary view be able to use sql, 
df.createTempView("people2")

In [23]:
spark.sql("SELECT AVG(age) from people2").show()

+--------+
|avg(age)|
+--------+
|    24.8|
+--------+



## Methods for creating Spark DataFrame

There are three ways to create a DataFrame in Spark by hand:

1. Create a list and parse it as a DataFrame using the createDataFrame() method from the SparkSession.

2. Convert an RDD to a DataFrame using the toDF() method.

3. Import a file into a SparkSession as a DataFrame directly.

### 1. Create DataFrame from a list of data

In [24]:
# 1. Generate toy data using a dictionary list
data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'E', "ID": 4, "Value": 33.87, "Truth": True}
        ]

In [25]:
# 2. Import and create a SparkSession : Created already above
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.getOrCreate()

In [26]:
# 3. Create a DataFrame using the createDataFrame on the `spark`
df = spark.createDataFrame(data)

In [27]:
#Print the schema and view the DataFrame in table format
df.printSchema()
df.show()

root
 |-- Category: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- Truth: boolean (nullable = true)
 |-- Value: double (nullable = true)

+--------+---+-----+------+
|Category| ID|Truth| Value|
+--------+---+-----+------+
|       A|  1| true|121.44|
|       B|  2|false|300.01|
|       C|  3| null| 10.99|
|       E|  4| true| 33.87|
+--------+---+-----+------+



In [28]:
#option 1 of use : Using Domain-Specific Queries
df.filter(df.Truth==True).sort(df.Value).show()

+--------+---+-----+------+
|Category| ID|Truth| Value|
+--------+---+-----+------+
|       E|  4| true| 33.87|
|       A|  1| true|121.44|
+--------+---+-----+------+



In [29]:
#option 2 of use : Using SQL Queries
df.createOrReplaceTempView('table')
spark.sql('''SELECT * FROM table 
                WHERE Truth=true 
                ORDER BY Value ASC''').show()

+--------+---+-----+------+
|Category| ID|Truth| Value|
+--------+---+-----+------+
|       E|  4| true| 33.87|
|       A|  1| true|121.44|
+--------+---+-----+------+



### 2. Create DataFrame from RDD

In [30]:
# 2. Import and create a SparkContext: Created already above
# from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("projectName").setMaster("local[*]")
# sc = SparkContext.getOrCreate(conf)

In [31]:
# 3. Generate an RDD from the created data. Check the type to confirm the object is an RDD:
rdd = sc.parallelize(data)
type(rdd)

pyspark.rdd.RDD

In [32]:
# 4. Call the toDF() method on the RDD to create the DataFrame. Test the object type to confirm:
df = rdd.toDF()
type(df)

pyspark.sql.dataframe.DataFrame

### 3. Create DataFrame from Data sources

Spark can handle a wide array of external data sources to construct DataFrames. The general syntax for reading from a file is:

spark.read.format('<data source>').load('<file path/file name>')

#### 3.1. Creating from CSV file

Create a Spark DataFrame by directly reading from a CSV file:

`df = spark.read.csv('<file name>.csv')`
    
Read multiple CSV files into one DataFrame by providing a list of paths:

`df = spark.read.csv(['<file name 1>.csv', '<file name 2>.csv', '<file name 3>.csv'])`

By default, Spark adds a header for each column. If a CSV file has a header you want to include, add the option method when importing:

`df = spark.read.csv('<file name>.csv').option('header', 'true')`

Individual options stacks by calling them one after the other. Alternatively, use the options method when more options are needed during import:

`df = spark.read.csv('<file name>.csv').options(header = True)`

#### 3.2. Creating from TXT file

Create a DataFrame from a text file with:

`df = spark.read.text('<file name>.txt')`

The csv method is another way to read from a txt file type into a DataFrame. For example:

`df = spark.read.option('header', 'true').csv('<file name>.txt')`

#### 3.3. Creating from JSON file

Make a Spark DataFrame from a JSON file by running:

`df = spark.read.json('<file name>.json')`

#### 3.4. Creating from an XML file

XML file compatibility is not available by default. Install the dependencies to create a DataFrame from an XML source.

1. Download the Spark XML dependency. Save the .jar file in the Spark jar folder.

2. Read an XML file into a DataFrame by running:

`df = spark.read\
            .format('com.databricks.spark.xml')\
            .option('rowTag', 'row')\
            .load('test.xml')`
            
Change the rowTag option if each row in your XML file is labeled differently.

#### 3.5 Create DataFrame from RDBMS Database

Reading from an RDBMS requires a driver connector. The example goes through how to connect and pull data from a MySQL database. Similar steps work for other database types.

1. Download the MySQL Java Driver connector. Save the .jar file in the Spark jar folder.

2. Run the SQL server and establish a connection.

3. Establish a connection and fetch the whole MySQL database table into a DataFrame:

`df = spark.read\
            .format('jdbc')\
            .option('url', 'jdbc:mysql://localhost:3306/db')\
            .option('driver', 'com.mysql.jdbc.Driver')\
            .option('dbtable','new_table')\
            .option('user','root')\
            .load()`


In [33]:
# Close the SparkSession 
spark.stop()

In [34]:
# Close the sparkContext 
sc.stop()

**Sources** : 
- IBM lab skills on coursera (data engineering specialization) 
- Apache spark foundation, 
- https://phoenixnap.com
