# **Introduction to PySpark**

## Spark
Spark is a platform that makes it easier to work with large datasets by spreading out the data and computations across **clusters**, containing multiple **nodes**. Each node is like a small computer, that can focus purely on a subset of the data. This way the total data can be computed in **parallel**

### Spark in Python
A spark cluster consists of one **master** node which controls multiple **worker** nodes. In practice the cluster is hosted on a remote machine (e.g. in Azure or DataBricks)

Creating a connection to the spark cluster is done by using the `SparkContext` class


In [1]:
from pyspark import SparkContext

sc = SparkContext()


print(sc.version)
sc

3.5.0


Once a connection with the cluster is established, the `SparkSession` class can be used to interface with the cluster. 

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('example app').getOrCreate()
spark

## DataFrames
The core data structure Spark uses is the **Resilient Distributed Dataset (RDD)**. This is a low level object that allows Spark to distribute data over its various nodes in the cluster. **DataFrames** are an abstraction built ontop of RDDs that make it easier to work with the data, for complex operations they can even be faster than RDDs.

The simplest way to create a dataframe is to read it in from a csv file (similar to pandas). 


In [3]:
df = spark.read.csv("./data/countries.csv", header=True, inferSchema=True)
df.show(5)


+--------------+---------+----+------------+-------------+
|          name|continent|code|surface_area|geosize_group|
+--------------+---------+----+------------+-------------+
|   Afghanistan|     Asia| AFG|      652090|       medium|
|   Netherlands|   Europe| NLD|       41526|        small|
|       Albania|   Europe| ALB|       28748|        small|
|       Algeria|   Africa| DZA|     2381740|        large|
|American Samoa|  Oceania| ASM|         199|        small|
+--------------+---------+----+------------+-------------+
only showing top 5 rows



DataFrames contain metadata about their schema and paritions, which can be accessed

In [4]:
print()
print('DataFrame Schema:')
print('-----------------')
df.printSchema()


DataFrame Schema:
-----------------
root
 |-- name: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- code: string (nullable = true)
 |-- surface_area: string (nullable = true)
 |-- geosize_group: string (nullable = true)



In [5]:
df.rdd.getNumPartitions()

1

### Spark vs Pandas
Spark DataFrames are similar to Pandas DataFrames, especially in their syntax. One can easily transform a Spark DataFrame into a Pandas DataFrame using `.toPandas()` and vice versa using `spark.createDataFrame()`

Some key differences between a Pandas and Spark DataFrame are:

* *Pandas*
    * in-memory operations
    * eager evaluation (operations are executed immediatly)
    * small overhead
    * optimized for small/medium datasets
    * mutable
    * integrates well with Python data-science eco-system (e.g. numpy, scikit-learn)

* *Spark*
    * distributed computing operations
    * lazy evaluation (operations are evaluated when necessary)
    * large overhead
    * optimized for large datasets
    * immutable
    * integrates well with Apache Spark eco-system (e.g. MLlib, GraphX, Spark Streaming)


In [6]:
df_pd = df.toPandas()
df_pd.head(5)

Unnamed: 0,name,continent,code,surface_area,geosize_group
0,Afghanistan,Asia,AFG,652090,medium
1,Netherlands,Europe,NLD,41526,small
2,Albania,Europe,ALB,28748,small
3,Algeria,Africa,DZA,2381740,large
4,American Samoa,Oceania,ASM,199,small


In [7]:
df_sp = spark.createDataFrame(df_pd)
df_sp.show(5)

+--------------+---------+----+------------+-------------+
|          name|continent|code|surface_area|geosize_group|
+--------------+---------+----+------------+-------------+
|   Afghanistan|     Asia| AFG|      652090|       medium|
|   Netherlands|   Europe| NLD|       41526|        small|
|       Albania|   Europe| ALB|       28748|        small|
|       Algeria|   Africa| DZA|     2381740|        large|
|American Samoa|  Oceania| ASM|         199|        small|
+--------------+---------+----+------------+-------------+
only showing top 5 rows



A Spark DataFrame can be turned into a Pandas DataFrame and vice versa. 

## Storage
In Spark a **DataFrame** is an immutable distributed collection of data organized by columns, and is stored in memory (or on disk if the DataFrame is very large). 

Similar to a SQL database, Spark uses tables on which SQL queries can be performed. A DataFrame can be turned into a **Temporary View** in order to use SQL queries on the data using. This temporary table is not yet part of the cluster, and like the DataFrame only exists in memory. 
```
createOrReplaceTempView()
createOrReplaceGlobalTempView()
```

The metadata of all tables in the spark session (including Temprorary Views) are stored inside of the **catalog**. A dataframe can be saved to a persistent storage system, such as Hive or Apache Parquet in order to make sure its permanently part of the Spark cluster. 

In [8]:
df.createOrReplaceTempView("temp_table")
print(spark.catalog.listTables())

[Table(name='temp_table', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


In [9]:
continent_counts = spark.sql("""
    SELECT
        continent,
        COUNT(*) AS count
    FROM temp_table
    GROUP BY continent
""")

print(type(continent_counts))
continent_counts.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+--------------------+-----+
|           continent|count|
+--------------------+-----+
|              Europe|   42|
| The Democratic R...|    1|
|              Africa|   52|
|             U.S."""|    1|
| Federated States...|    1|
|       North America|   29|
|          British"""|    1|
|       South America|   12|
|             Oceania|   18|
|                Asia|   49|
+--------------------+-----+

