In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


## Spark SQL and DataFrames

### Spark SQL vs RDDs
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.

### Spark DataFrames 
A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects

### Advantages of using DataFrame API over RDDs

1. In-built Optimization: DataFrame uses Catalyst engine which has an in-built execution optimization that improves the data processing performance significantly. When an action is called on a DataFrame, the Catalyst engine analyzes the code and resolves the references. Then, it creates a logical plan. After that, the created logical plan gets translated into an optimized physical plan. Finally, this physical plan gets executed on the cluster.
2. Hive compatible: The DataFrame is fully compatible with Hive query language. We can access all hive data, queries, UDFs, etc using Spark SQL from hive MetaStore and can execute queries against these hive databases.
3. Structured, semi-structured, and highly structured data support: DataFrame APIs supports manipulation of all kind of data from structured data files to semi-structured data files and highly structured parquet files.
4. Multi-language support: DataFrame APIs are available in Python, R, Scala, and Java.
5. Schema support: We can define a schema manually or we can read a schema from a data source which defines the column names and their data types.


### Comparing Spark DF API vs RDDs
| Feature                | RDD | DataFrame |
|------------------------|-----|-----------|
| Abstraction Level      | Low | High      |
| Immutable              | Yes | Yes       |
| Fault tolerant         | Yes | Yes       |
| TypeSafe               | Yes | No        |
| Schema                 | No  | Yes       |
| Execution Optimization | No  | Yes       |


### Pandas DataFrame vs Spark DataFrame

| Spark DataFrame                                                                                                  | Pandas DataFrame                                                                                                            |   |
|------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|---|
| Spark DataFrame supports parallelization.                                                                        | Pandas DataFrame does not support parallelization.                                                                          |   |
| Spark DataFrame has Multiple Nodes.                                                                              | Pandas DataFrame has a Single  Node.                                                                                        |   |
| It follows Lazy Execution which means that a task is not executed until an action is performed.                  | It follows Eager Execution, which means task is executed immediately.                                                       |   |
| Spark DataFrame is Immutable.                                                                                    | Pandas DataFrame is Mutable.                                                                                                |   |
| Complex operations are difficult to perform as compared to Pandas DataFrame.                                     | Complex operations are easier to perform as compared to Spark DataFrame.                                                    |   |
| Spark DataFrame is distributed and hence processing in the Spark DataFrame is faster for a large amount of data. | Pandas DataFrame is not distributed and hence processing in the Pandas DataFrame will be slower for a large amount of data. |   |
| sparkDataFrame.count() returns the number of rows.                                                               | pandasDataFrame.count() returns the number of non NA/null observations for each column.                                     |   |
| Spark DataFrames are excellent for building a scalable application.                                              | Pandas DataFrames can’t be used to build a scalable application.                                                            |   |
| Spark DataFrame assures fault tolerance.                                                                         | Pandas DataFrame does not assure fault tolerance. We need to implement our own framework to assure it.                      |   |

In [4]:
import pandas as pd

In [8]:
# 🐼 pandas 
pandasdf = pd.read_csv('../data/penguins.csv')
pandasdf.shape

# 🎇 PySpark
df = spark.read.csv('../data/penguins.csv', header=True, inferSchema=True)
df.count(), len(df.columns)

(344, 7)

### Inspecting DataFrames


In [9]:
# 🐼 pandas 
pandasdf.info()
# 🎇 PySpark
df.printSchema()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 344 entries, 0 to 343
Data columns (total 7 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   species            344 non-null    object 
 1   island             344 non-null    object 
 2   bill_length_mm     342 non-null    float64
 3   bill_depth_mm      342 non-null    float64
 4   flipper_length_mm  342 non-null    float64
 5   body_mass_g        342 non-null    float64
 6   sex                333 non-null    object 
dtypes: float64(4), object(3)
memory usage: 18.9+ KB
root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- bill_length_mm: double (nullable = true)
 |-- bill_depth_mm: double (nullable = true)
 |-- flipper_length_mm: integer (nullable = true)
 |-- body_mass_g: integer (nullable = true)
 |-- sex: string (nullable = true)



In [10]:
# 🐼 pandas 
df.head()
# 🎇 PySpark
df.show(5)

+-------+---------+--------------+-------------+-----------------+-----------+------+
|species|   island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+--------------+-------------+-----------------+-----------+------+
| Adelie|Torgersen|          39.1|         18.7|              181|       3750|  MALE|
| Adelie|Torgersen|          39.5|         17.4|              186|       3800|FEMALE|
| Adelie|Torgersen|          40.3|         18.0|              195|       3250|FEMALE|
| Adelie|Torgersen|          null|         null|             null|       null|  null|
| Adelie|Torgersen|          36.7|         19.3|              193|       3450|FEMALE|
+-------+---------+--------------+-------------+-----------------+-----------+------+
only showing top 5 rows



In [13]:
# 🐼 pandas 
pandasdf[['species', 'island']].head(3)
# 🎇 PySpark
df[['species', 'island']].show(3)

+-------+---------+
|species|   island|
+-------+---------+
| Adelie|Torgersen|
| Adelie|Torgersen|
| Adelie|Torgersen|
+-------+---------+
only showing top 3 rows



### Filtering DataFrames


In [14]:
# 🐼 pandas 
pandasdf[pandasdf['species']=='Gentoo'].head()
# 🎇 PySpark
df[df['species']=='Gentoo'].show(5)

+-------+------+--------------+-------------+-----------------+-----------+------+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+------+--------------+-------------+-----------------+-----------+------+
| Gentoo|Biscoe|          46.1|         13.2|              211|       4500|FEMALE|
| Gentoo|Biscoe|          50.0|         16.3|              230|       5700|  MALE|
| Gentoo|Biscoe|          48.7|         14.1|              210|       4450|FEMALE|
| Gentoo|Biscoe|          50.0|         15.2|              218|       5700|  MALE|
| Gentoo|Biscoe|          47.6|         14.5|              215|       5400|  MALE|
+-------+------+--------------+-------------+-----------------+-----------+------+
only showing top 5 rows



In [15]:
# 🐼 pandas 
pandasdf[pandasdf['species'].str.match('G.')].head()
pandasdf[(pandasdf['mass']<3400) & (pandasdf['sex']=='Male')].head()
# 🎇 PySpark
df[(df['mass']<3400) & (df['sex']=='Male')].show(5)
df[df['species'].rlike('G.')].show(5)

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex
220,Gentoo,Biscoe,46.1,13.2,211.0,4500.0,FEMALE
221,Gentoo,Biscoe,50.0,16.3,230.0,5700.0,MALE
222,Gentoo,Biscoe,48.7,14.1,210.0,4450.0,FEMALE
223,Gentoo,Biscoe,50.0,15.2,218.0,5700.0,MALE
224,Gentoo,Biscoe,47.6,14.5,215.0,5400.0,MALE


### Sorting DataFrames

In [None]:
# 🐼 pandas 
df.nsmallest(5, 'mass')
# 🎇 PySpark
df[df['mass'].isNotNull()].orderBy('mass').show(5)

### Aggregating DataFrames

In [None]:
# 🐼 pandas 
pandasdf.agg({"flipper": "mean"})
# 🎇 PySpark
df.agg({'flipper': 'mean'}).show()

We can also substitute .mean() with .avg() as well. In other words, we can use df.groupBy(‘species’).avg().show().


In [None]:
# 🐼 pandas 
df['species'].unique()
# 🎇 PySpark
df.select('species').distinct().show()

In [None]:
# 🐼 pandas 
df.groupby('species')['mass'].mean()
# 🎇 PySpark
df.groupBy('species').agg({'mass': 'mean'}).show()