# A Gentle Introduction to Pyspark and the World of Distributed Computing.

1. [Introduction](#section1)
2. [Differences Between Pandas and Spark Dataframes](#section2)
3. [Resilient Distributed Datasets (RDDs)](#section3)
4. [Creating a Pyspark DataFrame](#section4)
    - [Importing the necessary modules and setting up Pyspark for this tutorial](#section5)
    - [Creating DataFrame from RDD](#section6)
    - [Creating the DataFrame from CSV file](#section7)
    - [Viewing the data](#section8)
        - [Adjusting view](#section9)
        - [Dataframe shape](#section10)
5. [Basic DataFrame Manipulations](#section11)
    - [Converting string to datetime](#section12)
    - [Some basic statistics](#section13)
        - [Aggregates](#section14)
        - [Descriptions](#section15)
    - [Simple calculations](#section16)
        - [Calculated Columns](#section17)
        - [Aggregations](#section18)
        - [Value counts](#section19)
    - [Subsetting](#section20)
        - [On single condition](#section21)
        - [On multiple conditions](#section22)
        - [On time ranges](#section23)
    - [Differences](#section24)
        - [Amounts](#section25)
        - [In time](#section26)
    - [Some data adjustments](#section27)
6. [Handling nulls](#section28)
    - [Finding nulls](#section29)
    - [Filling nulls](#section30)
7. [Basic string matching](#section31)
8. [Binarising](#section32)

<a id="section1"></a>
## 1. Introduction

PySpark is a great language for performing exploratory data analysis at scale, building machine learning pipelines, and creating ETLs for a data platform. If you’re already familiar with Python and libraries such as Pandas, then PySpark is a great language to learn in order to create more scalable analyses and pipelines. The goal of this pop-up class is to show how to get up and running with PySpark, without a huge amount of work, and to enable you to perform common tasks.

The key data type used in PySpark is the Spark dataframe. This object can be thought of as a table distributed across a cluster and has functionality that is similar to tables in SQL and dataframes in Pandas. If you want to do distributed computation using PySpark, then you’ll need to perform operations on Spark dataframes, and not other python data types.
It possible to use Pandas dataframes when using Spark, by calling the toPandas() function on a Spark dataframe object, which returns a pandas object. However, this function should generally be avoided except when working with small dataframes, because it pulls the entire object into memory on a single node.

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2018/07/PySpark.png" alt="Drawing" style="width: 800px;"/>

<a id="section2"></a>
## 2. Differences Between Pandas and Spark Dataframes

One of the key differences between Pandas and Spark dataframes is eager versus lazy execution. In PySpark, operations are delayed until a result is actually needed in the pipeline. For example, you can specify operations for loading a data set from a source and applying a number of transformations to the dataframe, but these operations won’t immediately be applied. Instead, a graph of transformations is recorded, and once the data is actually needed, for example when writing the results back to a Spark dataframe, then the transformations are applied as a single pipeline operation. This approach is used to avoid pulling the full data frame into memory and enables more effective processing across a cluster of machines.

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2018/07/Shared-Memory.png" alt="Drawing" style="width: 1200px;"/>

With Pandas dataframes, everything is pulled into memory, and every Pandas operation is immediately applied.
In general, it’s a best practice to avoid eager operations in Spark if possible, since it limits how much of your pipeline can be effectively distributed.

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2018/07/Distributed-Memory.png" alt="Drawing" style="width: 1200px;"/>

<a id="section3"></a>
## 3. Resilient Distributed Datasets (RDDs)

Pyspark dataframes are stored on a cluster as Resilient Distributed Datasets (RDDs). You may now be wondering about how exactly a RDD works. Well, the data in an RDD is split into chunks based on a key. These chunks are then stored in various sets and locations within the cluster. This leads to RDDs being highly resilient, i.e, they are able to recover quickly from any issues as the same data chunks are replicated across multiple executor nodes. Thus, even if one executor node fails, another will still process the data. This allows you to perform calculations against your dataset very quickly by harnessing the power of multiple nodes working on one process at the same time.

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2018/07/Partitions-768x343.png" alt="Drawing" style="width: 600px;"/>

Features Of RDDs
1. **In-Memory Computations:** It improves the performance by an order of magnitudes.
2. **Lazy Evaluation:** All transformations in RDDs are lazy, i.e, doesn’t compute their results right away.
3. **Fault Tolerant:** RDDs track data lineage information to rebuild lost data automatically.
4. **Immutability:** Data can be created or retrieved anytime and once defined, its value can’t be changed.
5. **Partitioning:** It is the fundamental unit of parallelism in PySpark RDD.
6. **Persistence:** Users can reuse PySpark RDDs and choose a storage strategy for them.
7. **Coarse-Grained Operations:** These operations are applied to all elements in data sets through maps or filter or group by operation.

<img src="https://d1jnx9ba8s6j9r.cloudfront.net/blog/wp-content/uploads/2018/07/PySpark-RDD-Features.png" alt="Drawing" style="width:800px;"/>

<a id="section4"></a>
## 4. Creating a Pyspark DataFrame

<img src="https://i1.wp.com/www.analyticsvidhya.com/wp-content/uploads/2016/10/DataFrame-in-Spark.png?w=600&ssl=1" alt="Drawing" style="width: 500px;"/>

<a id="section5"></a>
### Importing the necessary modules and setting up Pyspark for this tutorial

In [None]:
import pandas as pd
import pyspark
from pyspark import SparkContext
sc =SparkContext()
from pyspark.sql import Row
from pyspark.sql import SQLContext
import pyspark.sql.functions as func
sqlContext = SQLContext(sc)
# make sure if you are doing multiple context import calls to have them all in one cell or it will error out

<a id="section6"></a>
### Creating DataFrame from RDD

In [None]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

<a id="section7"></a>
### Creating the DataFrame from CSV file

#### Pandas

In [None]:
df_pandas = pd.read_csv('Sales.csv')

#### Pyspark

In [None]:
from pyspark import SparkFiles
sqlContext = SQLContext(sc)
df_pyspark = sqlContext.read.csv(SparkFiles.get("D:/Users/Daniel Harty/Pyspark Tutorial/Sales.csv"), header=True, inferSchema=  False)

<a id="section8"></a>
### Viewing the data

In [None]:
df_pyspark.printSchema()

Somthing is not quite right with this Pyspark Dataframe. What is the problem?

In [None]:
df_pyspark = sqlContext.read.csv(SparkFiles.get("D:/Users/Daniel Harty/Pyspark Tutorial/Sales.csv"), header=True, inferSchema= True)
df_pyspark.printSchema()

<a id="section9"></a>
#### Adjusting view

In [None]:
df_pyspark.head(5)

Once again somthing is not quite right with this output it seems to be printing the correct information but in a rowise fashion that makes it very hard to read

In [None]:
df_pyspark.show(5, truncate = False)

Sometimes fields with long descriptions are cut shorter by Pyspark, by adding "False" after the number of rows you want to show, the field will show the full text.

<a id="section10"></a>
#### Dataframe shape

##### Pandas

In [None]:
df_pandas.shape

##### Pyspark

In [None]:
print(str(df_pyspark.count()) + ', ' + str(len(df_pyspark.columns)))

<a id="section11"></a>
### 5. Basic DataFrame Manipulations

<a id="section12"></a>
#### Converting string to datetime 

##### Pandas

In [None]:
df_pandas['Sales Date'] = pd.to_datetime(df_pandas['Sales Date'])

In [None]:
df_pandas.head(5)

##### Pyspark

###### Newer pyspark versions

In [None]:
from pyspark.sql.functions import to_timestamp
df_pyspark = df_pyspark.withColumn("Sales Date", to_timestamp("Sales Date", "dd_MM_yyyy"))

###### Older versions of pyspark

In [None]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql.types import DateType

df_pyspark = df_pyspark.withColumn('Sales Date', from_unixtime(unix_timestamp('Sales Date', 'dd/MM/yyyy')))
df_pyspark = df_pyspark.withColumn("Sales Date",df_pyspark['Sales Date'].cast(DateType()))

In [None]:
df_pyspark.show(5)

<a id="section13"></a>
#### Some basic statistics

<a id="section14"></a>
##### Aggregation Functions

###### Pandas

In [None]:
round(df_pandas['Sales Price'].mean(),2)

###### Pyspark

In [None]:
df_pyspark.agg({'Sales Price': 'mean'}).collect()[0][0]

<a id="section15"></a>
#### Descriptions

##### Descriptions of single columns

###### Pandas

In [None]:
df_pandas['Sales Price'].describe()

###### Pyspark

In [None]:
df_pyspark.select(['Sales Price']).describe().show()

##### Descriptions of all columns

###### Pandas

In [None]:
df_pandas.describe()

###### Pyspark

In [None]:
df_pyspark.describe().toPandas()

<a id="section16"></a>
#### Simple calculations

<a id="section17"></a>
##### Calculated columns

###### Pandas

In [None]:
df_pandas['Price_diff'] = df_pandas['Sales Price']-df_pandas['Sales Price (EX)']

In [None]:
df_pandas[['Supplier', 'Sales Price', 'Sales Price (EX)', 'Price_diff']].head()

###### Pyspark

In [None]:
df_pyspark = df_pyspark.withColumn('Price_diff', func.round(df_pyspark['Sales Price'] - df_pyspark['Sales Price (EX)'],2))
df_pyspark.select('Supplier', 'Sales Price', 'Sales Price (EX)', 'Price_diff').show()

<a id="section18"></a>
##### Aggregations

###### Pandas

In [None]:
df_pandas.groupby(['Supplier', 'Brand', 'Category'])[['Brand']].count().head()

###### Pyspark

In [None]:
df_pyspark.groupBy('Supplier', 'Brand', 'Category').count().show(5)

<a id="section19"></a>
##### Value counts

###### Pandas

In [None]:
df_pandas.Brand.value_counts()

###### Pyspark

In [None]:
value_counts_test = df_pyspark.groupBy('Brand').count()
value_counts_test.orderBy('count', ascending = False).show()

<a id="section20"></a>
#### Subsetting

<a id="section21"></a>
##### On single condition

###### Pandas

In [None]:
df_pandas[df_pandas['Brand'] == 'PRADA'].head()

###### Pyspark

With Pyspark, you have the option of subsetting in two different ways. You can either use the where method or the filter method

In [None]:
df_pyspark.where(df_pyspark["Brand"] == 'PRADA').show(5)

In [None]:
df_pyspark.filter(df_pyspark["Brand"] == 'PRADA').show(5)

<a id="section22"></a>
##### On multiple conditions

###### Pandas

In [None]:
df_pandas[(df_pandas['Brand'] == 'PRADA') & (df_pandas['Q3'] == 1)].head()

###### Pyspark

In [None]:
df_pyspark.where((df_pyspark["Brand"] == 'PRADA') & (df_pyspark["Q3"] == 1)).show(5)

In [None]:
df_pyspark.filter((df_pyspark["Brand"] == 'PRADA') & (df_pyspark["Q3"] == 1)).show(5)

<a id="section23"></a>
##### On time ranges

###### Pandas

In [None]:
df_pandas[(df_pandas['Sales Date'] >= '2018-06-01') & (df_pandas['Sales Date'] >= '2018-10-01')].head()

###### Pyspark

In [None]:
df_pyspark.filter((df_pyspark['Sales Date'] >= "2018-06-01") & (df_pyspark['Sales Date'] <= "2018-10-01")).show(5)

<a id="section24"></a>
#### Differences

<a id="section25"></a>
##### Amounts

What if you want to understand the difference between the prices in terms of the Prada items sold at Luxottica?

###### Pandas

In [None]:
diff_test = df_pandas[(df_pandas['Supplier'] == 'LUXOTTICA') & (df_pandas['Brand'] == 'PRADA')]
diff_test = diff_test.sort_values(by=['Sales Date'])
diff_test['price_diff'] = diff_test['Sales Price'].diff()

In [None]:
diff_test.head()

###### Pyspark

In [None]:
diff_test_ps = df_pyspark.where((df_pyspark['Supplier'] == 'LUXOTTICA') & (df_pyspark['Brand'] == 'PRADA'))

Pyspark can perform many of the same procedures as Pandas, however, many of them have to be imported additionally as opposed to being availbale immediately.

In [None]:
from pyspark.sql.window import Window

In [None]:
### Defining the window 
window = Window.orderBy('Sales Date')


## Create a lag column
df_lag = diff_test_ps.withColumn('prev_price',func.lag(diff_test_ps['Sales Price']).over(window))


## Subtract the price of the previous day from the current day

result = df_lag.withColumn('price_diff', func.round(df_lag['Sales Price'] - df_lag['prev_price'],2))


In [None]:
result.select('Supplier', 'Brand', 'Sales Date', 'price_diff').show()

<a id="section26"></a>
##### In time

What if you want to calculate the number of days between each transaction for each store?

###### Pandas

In [None]:
df_pandas = df_pandas.sort_values(by=['Sales Date'])
df_pandas['diff'] = df_pandas.groupby(['Supplier'])['Sales Date'].apply(lambda x: x.diff()).dt.days

In [None]:
df_pandas.head()

###### Pyspark

In [None]:
## Define the window
window = Window.partitionBy('Supplier').orderBy('Sales Date')

df_pyspark = df_pyspark.withColumn("days_passed", func.datediff(df_pyspark['Sales Date'], func.lag(df_pyspark['Sales Date'], 1).over(window)))

In [None]:
df_pyspark.select('Supplier', 'Brand', 'Sales Date', 'days_passed').show(5)

<a id="section27"></a>
#### Some data adjustments

Let's say that the price of Beyond products has increased by 5%, how to we make this adjustment?

##### Pandas

In [None]:
df_pandas[df_pandas['Brand'] == 'BEYOND'].head()

In [None]:
df_pandas.loc[(df_pandas.Brand == 'BEYOND'), ('Sales Price', 'Sales Price (EX)')] *= 1.05

In [None]:
df_pandas[df_pandas['Brand'] == 'BEYOND'].head()

##### Pyspark

In [None]:
condition_beyond = (func.col('Brand') == 'BEYOND')

In [None]:
df_pyspark_readjust = df_pyspark.withColumn('Sales Price', func.when(condition_beyond, func.col('Sales Price')*1.05).otherwise(func.col('Sales Price')* 1)).withColumn('Sales Price (EX)', func.when(condition_beyond, func.col('Sales Price (EX)')*1.05).otherwise(func.col('Sales Price (EX)')* 1))

In [None]:
df_pyspark_readjust.where(df_pyspark['Brand'] == 'BEYOND').select('Brand', 'Sales Price', 'Sales Price (EX)').show(5)

<a id="section28"></a>
### 6. Handling nulls

<a id="section29"></a>
#### Finding nulls

##### Pandas

In [None]:
df_pandas[df_pandas['Brand'].isnull()]

##### Pyspark

In [None]:
df_pyspark.where(df_pyspark['Brand'].isNull()).show()

<a id="section30"></a>
#### Filling nulls

##### Pandas

In [None]:
df_pandas['Brand'] = df_pandas['Brand'].fillna('empty')

##### Pyspark

In [None]:
df_pyspark = df_pyspark.na.fill("NA", 'Brand')

In [None]:
df_pyspark.where(df_pyspark['Supplier'].isNull()).show()

<a id="section31"></a>
### 7. Basic string matching

We saw that Armani and Guess are present in the names of two brands each. How do we extract these at the same time?

#### Pandas

In [None]:
df_pandas[df_pandas['Brand'].str.contains("ARMANI|GUESS")]

#### Pyspark

In [None]:
double_brand = df_pyspark.where(df_pyspark['Brand'].rlike("ARMANI|GUESS"))

In [None]:
double_brand = double_brand.groupBy('Brand').count()
double_brand.orderBy('count', ascending = False).show()

<a id="section32"></a>
### 8. Binarising

What if you want to add a binary column?

#### Pandas

In [None]:
df_pandas['zero_val'] = df_pandas['Sales Price'].apply(lambda x: x == 0).astype('int')

In [None]:
df_pandas.head()

#### Pyspark

In [None]:
df_pyspark = df_pyspark.withColumn('zero_val', func.when(func.col('Sales Price') == 0, 1).otherwise(0))
df_pyspark.select('Supplier','Brand', 'Sales Price', 'zero_val').show()