# PySpark
PySpark is a Python API for Apache Spark. Apache Spark is an analytical processing engine designed for large-scale, high-performance distributed data processing and machine learning applications. 

PySpark is widely used in the Data Science and Machine Learning communities, as there are many popular data science libraries written in Python, such as NumPy and TensorFlow.
It is also used because of its efficiency in processing large datasets.
Many organisations, including Walmart, Trivago, Sanofi, Runtastic, and others, have used PySpark. 

## Architecture
Workflow of Spark Architecture,

**STEP 1**
The client submits spark user application code. When an application code is submitted, the driver implicitly converts user code that contains transformations and actions into a logically directed acyclic graph called DAG. At this stage, it also performs optimizations such as pipelining transformations.

**STEP 2**
After that, it converts the logical graph called DAG into physical execution plan with many stages. After converting into a physical execution plan, it creates physical execution units called tasks under each stage. Then the tasks are bundled and sent to the cluster.

**STEP 3**
Now the driver talks to the cluster manager and negotiates the resources. Cluster manager launches executors in worker nodes on behalf of the driver. At this point, the driver will send the tasks to the executors based on data placement. When executors start, they register themselves with drivers. So, the driver will have a complete view of executors that are executing the task.

**STEP 4**
During the course of execution of tasks, driver program will monitor the set of executors that runs. Driver node also schedules future tasks based on data placement. 

![architecture](architecture.png "Pyspark Architecture")

## PySpark Features
- In-memory computation
- Distributed processing using parallelize
- Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
- Fault-tolerant
- Immutable
- Lazy evaluation
- Cache & persistence
- Inbuild-optimization when using DataFrames
- Supports ANSI SQL

## Advantages of PySpark
- PySpark is a general-purpose, in-memory, distributed processing engine that allows you to process data efficiently in a distributed fashion.
- Applications running on PySpark are 100x faster than traditional systems.
- You will get great benefits using PySpark for data ingestion pipelines.
- Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems.
- PySpark also is used to process real-time data using Streaming and Kafka.
- Using PySpark streaming you can also stream files from the file system and also stream from the socket.
- PySpark natively has machine learning and graph libraries.

## Installation Guide
For installing PySpark, Follow the guidelines present on the official PySpark page,

- [PySpark Installation Guide](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

*OR* 

You can use *Docker* to set up the environment using the following guide,

- [PySpark using Docker](https://towardsdatascience.com/stuck-trying-to-get-pyspark-to-work-in-your-data-science-environment-here-is-another-way-fb80a4bb7d8f)

## Dataset Used
The Dataset used for this tutorial can be found at [here](https://www.kaggle.com/datasets/dinnymathew/usstockprices).

# PySpark Code Samples

## Importing Modules

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

## Creating Spark App

The entry point into all functionality in Spark is the SparkSession class.

In [2]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('Pyspark_Tut') \
        .getOrCreate()

## Read Data

Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on using relational transformations and can also be used to create a temporary view.

### Reading csv file
PySpark provides ```spark.read().csv("file_name")``` to read a file or directory of files in CSV format into Spark DataFrame. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.

The `printSchema()` method is used to display the schema of the PySpark dataframe.

In [3]:
s_data = spark.read.csv(
    "stocks_price_final.csv",
    sep=',',
    header=True
)

s_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



One of the greatest features of Apache Spark is its ability to infer the schema on the fly. By default, `inferSchema` is set to `False`, meaning no schema inference will be done from data.

## Reading with infer schema

When set to `True`, pyspark infers the input schema automatically from data. *It requires one extra pass over the data*.

In [4]:
inferSchema_data = spark.read.csv(
    "stocks_price_final.csv",
    sep=',',
    inferSchema=True,
    header=True
)

inferSchema_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



### Define Schema

PySpark provides the `StructType()` and `StructField()` methods which are used to define the columns in the PySpark DataFrame.

Using these methods, we can define the column names and the data types of the particular columns.

In [5]:
data_schema = [
    StructField('_c0', IntegerType(), True),
    StructField('symbol', StringType(), True),
    StructField('data', DateType(), True),
    StructField('open', DoubleType(), True),
    StructField('high', DoubleType(), True),
    StructField('low', DoubleType(), True),
    StructField('close', DoubleType(), True),
    StructField('volume', IntegerType(), True),
    StructField('adjusted', DoubleType(), True),
    StructField('market.cap', StringType(), True),
    StructField('sector', StringType(), True),
    StructField('industry', StringType(), True),
    StructField('exchange', StringType(), True)
]

final_struc = StructType(fields = data_schema)

In [6]:
data = spark.read.csv(
    "stocks_price_final.csv",
    sep=',',
    header=True,
    schema = final_struc
)

data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



## Viewing Data

### show method
The top rows of a DataFrame can be displayed using `DataFrame.show()`

In [7]:
data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

The rows can also be shown vertically. This is useful when rows are too long to show horizontally.

In [8]:
data.show(1, vertical=True)

-RECORD 0--------------------------
 _c0        | 1                    
 symbol     | TXG                  
 data       | 2019-09-12           
 open       | 54.0                 
 high       | 58.0                 
 low        | 51.0                 
 close      | 52.75                
 volume     | 7326300              
 adjusted   | 52.75                
 market.cap | $9.31B               
 sector     | Capital Goods        
 industry   | Biotechnology: La... 
 exchange   | NASDAQ               
only showing top 1 row



### describe

Show the summary of the DataFrame, use `describe` method.

In [9]:
data.select("open", "close", "high", "low").describe().show()

+-------+------------------+------------------+------------------+------------------+
|summary|              open|             close|              high|               low|
+-------+------------------+------------------+------------------+------------------+
|  count|           1726301|           1726301|           1726301|           1726301|
|   mean|15070.071703341047| 15032.71485433071|15555.067268137085|14557.808227578987|
| stddev|1111821.8002863186|1109755.9294000594|1148247.1953514975|1072968.1558434179|
|    min|             0.072|             0.071|             0.078|             0.052|
|    max|      1.60168176E8|      1.58376592E8|      1.61601456E8|      1.55151728E8|
+-------+------------------+------------------+------------------+------------------+



### Collect data

`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. 

**Note** that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`.

In [10]:
# all_data = data.collect()  # ---- May through memory error

sub_data = data.take(5)
sub_data

[Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', data=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', data=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=4, symbol='TXG', data=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capit

### To Pandas

PySpark DataFrame also provides the conversion back to a pandas DataFrame to leverage pandas API. 

In [11]:
pd_data = data.toPandas()
pd_data.head()

Unnamed: 0,_c0,symbol,data,open,high,low,close,volume,adjusted,market.cap,sector,industry,exchange
0,1,TXG,2019-09-12,54.0,58.0,51.0,52.75,7326300.0,52.75,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
1,2,TXG,2019-09-13,52.75,54.355,49.150002,52.27,1025200.0,52.27,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
2,3,TXG,2019-09-16,52.450001,56.0,52.009998,55.200001,269900.0,55.200001,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
3,4,TXG,2019-09-17,56.209999,60.900002,55.423,56.779999,602800.0,56.779999,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
4,5,TXG,2019-09-18,56.849998,62.27,55.650002,62.0,1589600.0,62.0,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ


# Basic Operations

## Creating new Column

Creating a new column from an existing column can done using the `withColumn` method.

In [12]:
data = data.withColumn('date', data.data)

data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|      date|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-12|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-13|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-16|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotec

### Column Renaming

Renaming a column a=can be achieved using `withColumnRenamed` method

In [13]:
data = data.withColumnRenamed('date', 'data_changed')

data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+------------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|data_changed|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+------------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|  2019-09-12|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|  2019-09-13|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|  2019-09-16|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital 

### Drop Column

Dropping a column a=can be achieved using `drop` method

In [14]:
data = data.drop('data_changed')

data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

## Subsetting Data

Select only columns needed from a dataframe

In [15]:
data.select(['open', 'high', 'low', 'close', 'volume', 'adjusted']).show(5)

+---------+---------+---------+---------+-------+---------+
|     open|     high|      low|    close| volume| adjusted|
+---------+---------+---------+---------+-------+---------+
|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|
|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|
|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|
|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|
|56.849998|    62.27|55.650002|     62.0|1589600|     62.0|
+---------+---------+---------+---------+-------+---------+
only showing top 5 rows



### Filter

To select a subset of rows, use `DataFrame.filter()` or `Dataframe.where()`.


Selecting Data for **Health Care** sector only.

In [16]:
health = data.filter(col('sector') == 'Health Care')

health.show(5)

+---+------+----------+----+----+----+-----+------+--------+----------+-----------+--------------------+--------+
|_c0|symbol|      data|open|high| low|close|volume|adjusted|market.cap|     sector|            industry|exchange|
+---+------+----------+----+----+----+-----+------+--------+----------+-----------+--------------------+--------+
|218|    YI|2019-01-02|6.02|6.11| 6.0|  6.0|  4100|     6.0|  $560.04M|Health Care|Medical/Nursing S...|  NASDAQ|
|219|    YI|2019-01-03|6.02|6.05|5.95| 6.05|  4700|    6.05|  $560.04M|Health Care|Medical/Nursing S...|  NASDAQ|
|220|    YI|2019-01-04|6.05|6.97|6.05| 6.78| 10300|    6.78|  $560.04M|Health Care|Medical/Nursing S...|  NASDAQ|
|221|    YI|2019-01-07| 6.8| 7.0|6.55|  7.0| 10100|     7.0|  $560.04M|Health Care|Medical/Nursing S...|  NASDAQ|
|222|    YI|2019-01-08| 7.0|9.76| 6.9| 9.55| 60300|    9.55|  $560.04M|Health Care|Medical/Nursing S...|  NASDAQ|
+---+------+----------+----+----+----+-----+------+--------+----------+-----------+-----

Selecting `[date, open, close, adjusted]` columns for **Technology** sector only.

In [17]:
tech = data.where(col('sector') == 'Technology').select('data', 'open', 'close', 'adjusted')

tech.show(5)

+----------+----+-----+--------+
|      data|open|close|adjusted|
+----------+----+-----+--------+
|2019-01-02|8.51| 8.55|    8.55|
|2019-01-03| 8.5| 8.59|    8.59|
|2019-01-04|8.72| 8.88|    8.88|
|2019-01-07|8.88| 8.86|    8.86|
|2019-01-08|8.93|  9.4|     9.4|
+----------+----+-----+--------+
only showing top 5 rows



- `lit()` - PySpark SQL function lit() are used to add a new column to DataFrame by assigning a literal or constant value
- `col()` - Returns a Column based on the given column name.

In [18]:
data.filter( (col('data') >= lit('2020-01-01')) & (col('data') <= lit('2020-01-31')) ).show(5)

+---+------+----------+---------+---------+---------+---------+------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close|volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+------+---------+----------+-------------+--------------------+--------+
| 78|   TXG|2020-01-02|76.910004|77.989998|71.480003|72.830002|220200|72.830002|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 79|   TXG|2020-01-03|71.519997|76.188004|70.580002|75.559998|288300|75.559998|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 80|   TXG|2020-01-06|75.269997|77.349998|73.559998|75.550003|220600|75.550003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 81|   TXG|2020-01-07|     76.0|77.279999|    75.32|75.980003|182400|75.980003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 82|   TXG|2020-01-08|76.089996|76.949997|72.73

### Range Filtering

In [19]:
data.filter(data.adjusted.between(100.0, 500.0)).show(5)

+----+------+----------+----------+----------+----------+----------+------+----------+----------+-------------+--------------------+--------+
| _c0|symbol|      data|      open|      high|       low|     close|volume|  adjusted|market.cap|       sector|            industry|exchange|
+----+------+----------+----------+----------+----------+----------+------+----------+----------+-------------+--------------------+--------+
|  93|   TXG|2020-01-24| 95.459999|     101.0| 94.157997|100.790001|328100|100.790001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  94|   TXG|2020-01-27| 99.760002|104.892998| 97.019997|103.209999|334900|103.209999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  95|   TXG|2020-01-28|104.620003|108.269997|103.297997|106.620003|245400|106.620003|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|6893|  ABMD|2019-01-02|315.940002|320.709991|307.029999|309.959991|590000|309.959991|   $13.39B|  Health Care|Medical/Dental In...|  NASDAQ|
|6894|

### Case Statement

You can perform a SQL style CASE statement using `when` and `otherwise`.

In [20]:
data.select('open', 'close', when(data.close >= 60.0, 1).otherwise(0)).show(10)

+---------+---------+-------------------------------------------+
|     open|    close|CASE WHEN (close >= 60.0) THEN 1 ELSE 0 END|
+---------+---------+-------------------------------------------+
|     54.0|    52.75|                                          0|
|    52.75|    52.27|                                          0|
|52.450001|55.200001|                                          0|
|56.209999|56.779999|                                          0|
|56.849998|     62.0|                                          1|
|62.810001|61.119999|                                          1|
|61.709999|     60.5|                                          1|
|60.220001|60.330002|                                          1|
|     61.0|54.299999|                                          0|
|54.459999|52.759998|                                          0|
+---------+---------+-------------------------------------------+
only showing top 10 rows



### Regex Matching using `rlike`

In [21]:
data.select('sector', 
            data.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

+--------------------+---------------------------+
|              sector|Sector Starting with B or C|
+--------------------+---------------------------+
|         Health Care|                      false|
|       Capital Goods|                       true|
|Consumer Non-Dura...|                       true|
|    Public Utilities|                      false|
|   Consumer Durables|                       true|
|             Finance|                      false|
|      Transportation|                      false|
|       Miscellaneous|                      false|
|   Consumer Services|                       true|
|              Energy|                      false|
|    Basic Industries|                       true|
|          Technology|                      false|
+--------------------+---------------------------+



# Grouping Data

PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

Available Aggregations:

| Function | Description |
| --- | --- |
| `count()` | Returns the count of rows for each group. |
| `mean()` | Returns the mean of values for each group. |
| `max()` | Returns the maximum of values for each group. |
| `min()` | Returns the minimum of values for each group. |
| `sum()` | Returns the total for values for each group. | 
| `avg()` | Returns the average for values for each group. |
| `agg()` | Using agg() function, we can calculate more than one aggregate at a time. |
| `pivot()` | This function is used to Pivot the DataFrame |

## Count

Counts the number of records in a group

In [22]:
data.groupBy('sector').count().show()

+--------------------+------+
|              sector| count|
+--------------------+------+
|       Miscellaneous| 50221|
|         Health Care|316175|
|    Public Utilities| 72836|
|              Energy| 87494|
|Consumer Non-Dura...| 78080|
|             Finance|303180|
|    Basic Industries| 97323|
|       Capital Goods|133122|
|          Technology|229799|
|   Consumer Services|272393|
|   Consumer Durables| 48404|
|      Transportation| 40007|
+--------------------+------+



## Mean

In [23]:
data.select(['sector', 'open', 'close', 'adjusted']).groupBy('sector').mean().show()

+--------------------+------------------+------------------+------------------+
|              sector|         avg(open)|        avg(close)|     avg(adjusted)|
+--------------------+------------------+------------------+------------------+
|       Miscellaneous| 52.03839496900667| 52.06362854950963| 51.80973033632302|
|         Health Care|119.96763306523248|119.07806125419023|118.97394778016215|
|    Public Utilities|  35.5807773523944| 35.58528245861939| 34.73015568500465|
|              Energy| 24.45658989126103|24.427350302157844|23.684714263000885|
|Consumer Non-Dura...| 43.32860274612681|43.330386013645914| 42.81762456569027|
|             Finance|37.774667068190475|37.779002314288824| 37.10028522718037|
|    Basic Industries|266410.35470107093| 265750.3613671152|263865.51070311887|
|       Capital Goods| 60.48854363282767| 60.51655483568793| 59.97512253879296|
|          Technology|49.516045118394906| 49.53479888748223| 49.25234033754494|
|   Consumer Services| 55.07886734259143

## Aggregations

Using `agg()` function, we can calculate more than one aggregate at a time.

In [24]:
data.groupBy('sector').agg(
    min("data").alias("From"),
    max("data").alias("To"),
    min("open").alias("Minimum Opening"),
    max("open").alias("Maxmum Opening"),
    avg("open").alias("Average Opening"),
    min("close").alias("Minimum Closing"),
    max("close").alias("Maximum Closing"),
    avg("close").alias("Average Closing"),
).show(truncate=False)

+---------------------+----------+----------+---------------+--------------+------------------+---------------+---------------+------------------+
|sector               |From      |To        |Minimum Opening|Maxmum Opening|Average Opening   |Minimum Closing|Maximum Closing|Average Closing   |
+---------------------+----------+----------+---------------+--------------+------------------+---------------+---------------+------------------+
|Miscellaneous        |2019-01-02|2020-07-22|0.147          |1059.98999    |52.03839496900667 |0.1361         |1035.829956    |52.06362854950963 |
|Health Care          |2019-01-02|2020-07-22|0.072          |186000.0      |119.96763306523248|0.071          |187000.0       |119.07806125419023|
|Public Utilities     |2019-01-02|2020-07-22|0.331          |280.0         |35.5807773523944  |0.325          |282.220001     |35.58528245861939 |
|Energy               |2019-01-02|2020-07-22|0.1            |905.0         |24.45658989126103 |0.09           |901.039

## Pivot

Pivot() is an aggregation where one of the grouping columns values transposed into individual columns with distinct data.

Aggregating data for each `industry` for each `exhange` by sum of `open` in dataframe.

In [25]:
health.groupBy('industry').pivot('exchange').sum('open').show()

+--------------------+--------------------+------------------+
|            industry|              NASDAQ|              NYSE|
+--------------------+--------------------+------------------+
|    Ophthalmic Goods|  24940.150004000003|137920.41975700003|
|Biotechnology: Bi...|  1080056.3640190002|14947.620007000003|
|Biotechnology: El...|  210061.58897299992|122226.08995200001|
|Precision Instrum...|   9594.619975999998|              null|
|Medical/Nursing S...|  197619.64422999995|265969.70723499986|
|Hospital/Nursing ...|   61067.73431699995|156129.72005899984|
|Medical Specialities|  191181.31875899993| 602157.3348899995|
|Biotechnology: In...|  246407.79592300003|              null|
|Misc Health and B...|  15340.322973000004|43541.455038000044|
| Medical Electronics|  19621.877031000004|              null|
|Other Pharmaceuti...|  12111.344007999998|140996.56995299988|
|Medical/Dental In...|  1165266.6330440005| 767942.2839969998|
|Industrial Specia...|  366984.73284199974| 209960.1032

# Ordering, Duplicate and Null Handling

## Duplicate Handling

### distinct

PySpark `distinct()` function is used to drop/remove the duplicate rows (all columns) from DataFrame

In [26]:
distinctdf = data.distinct()
print("Distinct count: "+str(distinctdf.count()))
distinctdf.show(5)

Distinct count: 1729034
+----+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
| _c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+----+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
| 208|   TXG|2020-07-09|     92.0|92.400002|89.660004|92.290001| 447100|92.290001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 216|   TXG|2020-07-21|93.099998|95.269997|91.860001|95.209999|2306800|95.209999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
| 703|   PIH|2019-05-16|     5.44|     5.58|      5.4|     5.55|  14700|     5.55|   $27.43M|      Finance|Property-Casualty...|  NASDAQ|
| 761|   PIH|2019-08-08|     4.91|      5.2|     4.88|     5.02|   6600|     5.02|   $27.43M|      Finance|Property-Casualty...|  NASDAQ|
|1002|  TU

### dropDuplicates()

`dropDuplicates()` is used to drop rows based on selected (one or multiple) columns.

In [27]:
dropDisDF = data.dropDuplicates(["sector","industry"])
print("Distinct count of sector and industry : "+str(dropDisDF.count()))
dropDisDF.show(5)

Distinct count of sector and industry : 208
+------+------+----------+---------+-----+---------+---------+-------+---------+----------+----------------+--------------------+--------+
|   _c0|symbol|      data|     open| high|      low|    close| volume| adjusted|market.cap|          sector|            industry|exchange|
+------+------+----------+---------+-----+---------+---------+-------+---------+----------+----------------+--------------------+--------+
|164990|  CLXT|2019-01-02|    10.27|10.67|    10.07|    10.39|  40700|    10.39|  $156.61M|Basic Industries|Agricultural Chem...|  NASDAQ|
|189134|  CENX|2019-01-02|     7.18| 7.85|     6.98|     7.55|1828900|     7.55|  $689.74M|Basic Industries|            Aluminum|  NASDAQ|
|990154|  YTEN|2019-01-02|34.799999| 36.0|30.799999|30.799999|   1300|30.799999|   $13.46M|Basic Industries|Containers/Packaging|  NASDAQ|
|213934|  CLSK|2019-01-02|     20.0| 24.0|     20.0|22.799999|  10737|22.799999|   $52.09M|Basic Industries|Electric Utili

## Null Handling

In [28]:
data_sub = data.select('sector', 'industry', 'open', 'close', 'adjusted', 'volume', 'exchange')

data_sub.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_sub.columns]).show()

+------+--------+----+-----+--------+------+--------+
|sector|industry|open|close|adjusted|volume|exchange|
+------+--------+----+-----+--------+------+--------+
|     0|       0|2733| 2733|    2733|  3827|       0|
+------+--------+----+-----+--------+------+--------+



### Dropping if any column contains null

In [29]:
print(f"Before Drop count: {data_sub.count()}")
df = data_sub.na.drop()
print(f"Before Drop count: {df.count()}")
print(f"Dropped count: {data_sub.count() - df.count()}")

Before Drop count: 1729034
Before Drop count: 1725207
Dropped count: 3827


### Dropping if subset of columns contains null

In [30]:
print(f"Before Drop count: {data_sub.count()}")
df = data_sub.na.drop(subset=["open", "close"])
print(f"Before Drop count: {df.count()}")
print(f"Dropped count: {data_sub.count() - df.count()}")

Before Drop count: 1729034
Before Drop count: 1726301
Dropped count: 2733


### Filling null values with constant

PySpark `fill(value:Long)` is used to replace NULL/None values with numeric values either zero(0) or any constant value for all integer and long datatype columns of PySpark DataFrame or Dataset.

In [31]:
df = data_sub.na.fill(value=0)
df.select('sector', 'open', 'close', 'exchange').where(col("open") == 0).show(5)

+-------------+----+-----+--------+
|       sector|open|close|exchange|
+-------------+----+-----+--------+
|Miscellaneous| 0.0|  0.0|  NASDAQ|
|  Health Care| 0.0|  0.0|  NASDAQ|
|  Health Care| 0.0|  0.0|  NASDAQ|
|  Health Care| 0.0|  0.0|  NASDAQ|
|  Health Care| 0.0|  0.0|  NASDAQ|
+-------------+----+-----+--------+
only showing top 5 rows



## Ordering / Sorting

You can use either `sort()` or `orderBy()` function of PySpark DataFrame to sort DataFrame by ascending or descending order based on single or multiple columns, you can also do sorting using PySpark SQL sorting functions

In [32]:
data.sort('industry') \
    .select('sector', 'industry', 'open', 'close') \
    .show()

+-------+--------------------+---------+---------+
| sector|            industry|     open|    close|
+-------+--------------------+---------+---------+
|Finance|Accident &Health ...|17.190001|    17.57|
|Finance|Accident &Health ...|     20.1|    20.16|
|Finance|Accident &Health ...|    17.49|17.139999|
|Finance|Accident &Health ...|17.379999|    17.66|
|Finance|Accident &Health ...|17.620001|     17.9|
|Finance|Accident &Health ...|    18.26|18.139999|
|Finance|Accident &Health ...|18.120001|18.540001|
|Finance|Accident &Health ...|     18.4|17.530001|
|Finance|Accident &Health ...|    17.41|    17.65|
|Finance|Accident &Health ...|17.540001|17.450001|
|Finance|Accident &Health ...|17.459999|    17.75|
|Finance|Accident &Health ...|    17.82|    18.52|
|Finance|Accident &Health ...|18.379999|    19.26|
|Finance|Accident &Health ...|    19.32|19.620001|
|Finance|Accident &Health ...|    19.41|    19.83|
|Finance|Accident &Health ...|    19.83|19.790001|
|Finance|Accident &Health ...| 

In [33]:
df.orderBy(col("open").desc(),col("sector")).show(5)

+----------------+---------------+------------+------------+------------+------+--------+
|          sector|       industry|        open|       close|    adjusted|volume|exchange|
+----------------+---------------+------------+------------+------------+------+--------+
|Basic Industries|Major Chemicals|1.60168176E8|1.58376592E8|1.57249392E8|     0|    NYSE|
|Basic Industries|Major Chemicals|1.59451552E8|1.56943312E8|1.55826304E8|     0|    NYSE|
|Basic Industries|Major Chemicals|1.57659952E8|1.45835456E8|1.44797504E8|     0|    NYSE|
|Basic Industries|Major Chemicals|  1.504936E8|1.44760512E8|1.43731152E8|     0|    NYSE|
|Basic Industries|Major Chemicals|1.46193776E8|1.40819008E8|1.39816768E8|     0|    NYSE|
+----------------+---------------+------------+------------+------------+------+--------+
only showing top 5 rows



# Joins

PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.

| Join String | Equivalent SQL Join |
| --- | --- |
| inner | INNER JOIN |
| outer, full, fullouter, full_outer | FULL OUTER JOIN |
|left, leftouter, left_outer | LEFT JOIN |
|right, rightouter, right_outer | RIGHT JOIN |
|cross | |
|anti, leftanti, left_anti | |
|semi, leftsemi, left_semi | |

## Setting up Mock Data

In [34]:
product = [(101,"Watch","Fashion",299.0,"Delhi"), \
        (102,"Bag","Fashion",1350.0,"Mumbai"), \
        (103,"Shoes","Fashion",2999.0,"Chennai"), \
        (104,"Smartphone","Electronics",14999.0,"Kolkata"), \
        (105,"Books","Study",145.0,"Delhi"), \
        (106,"Oil","Grocery",110.0,"Chennai"), \
        (107,"Laptop","Electronics",79999.0,"Bengalore") \
]
productColumns = ["product_id","name","category","price", "seller_city"]

productDF = spark.createDataFrame(data=product, schema = productColumns)
productDF.printSchema()
productDF.show(truncate=False)

customer = [(1, "Olivia", 20, 101, "Watch", "Mumbai"), \
            (2, "Aditya", 25, 0, "NA", "Delhi"), \
            (3, "Cory", 15, 106, "Oil", "Bengalore"), \
            (4, "Isabell", 10, 0, "NA", "Chennai"), \
            (5, "Dominic", 30, 103, "Shoes", "Chennai"), \
            (6, "Tyler", 65, 104, "Smartphone", "Delhi"), \
            (7, "Samuel", 35, 0, "NA", "Kolkata"), \
            (8, "Daniel", 18, 0, "NA", "Delhi"), \
            (9, "Jeremy", 23, 107, "Laptop", "Mumbai"), \
]
customerColumns = ["id","name", "age", "product_id", "purchased_product", "city"]
customerDF = spark.createDataFrame(data=customer, schema = customerColumns)
customerDF.printSchema()
customerDF.show(truncate=False)


root
 |-- product_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- seller_city: string (nullable = true)

+----------+----------+-----------+-------+-----------+
|product_id|name      |category   |price  |seller_city|
+----------+----------+-----------+-------+-----------+
|101       |Watch     |Fashion    |299.0  |Delhi      |
|102       |Bag       |Fashion    |1350.0 |Mumbai     |
|103       |Shoes     |Fashion    |2999.0 |Chennai    |
|104       |Smartphone|Electronics|14999.0|Kolkata    |
|105       |Books     |Study      |145.0  |Delhi      |
|106       |Oil       |Grocery    |110.0  |Chennai    |
|107       |Laptop    |Electronics|79999.0|Bengalore  |
+----------+----------+-----------+-------+-----------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- purchased_product: string (nul

## Inner Join

This joins two datasets on key columns, where keys don’t match the rows get dropped from both datasets

In [35]:
innerDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "inner")
innerDF.show()

+----------+----------+-----------+-------+-----------+---+-------+---+----------+-----------------+---------+
|product_id|      name|   category|  price|seller_city| id|   name|age|product_id|purchased_product|     city|
+----------+----------+-----------+-------+-----------+---+-------+---+----------+-----------------+---------+
|       101|     Watch|    Fashion|  299.0|      Delhi|  1| Olivia| 20|       101|            Watch|   Mumbai|
|       103|     Shoes|    Fashion| 2999.0|    Chennai|  5|Dominic| 30|       103|            Shoes|  Chennai|
|       104|Smartphone|Electronics|14999.0|    Kolkata|  6|  Tyler| 65|       104|       Smartphone|    Delhi|
|       106|       Oil|    Grocery|  110.0|    Chennai|  3|   Cory| 15|       106|              Oil|Bengalore|
|       107|    Laptop|Electronics|79999.0|  Bengalore|  9| Jeremy| 23|       107|           Laptop|   Mumbai|
+----------+----------+-----------+-------+-----------+---+-------+---+----------+-----------------+---------+



## Outer Join

Outer a.k.a full, fullouter join returns all rows from both datasets, where join expression doesn’t match it returns null on respective record columns.

In [36]:
outerDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "outer")
outerDF.show()

+----------+----------+-----------+-------+-----------+----+-------+----+----------+-----------------+---------+
|product_id|      name|   category|  price|seller_city|  id|   name| age|product_id|purchased_product|     city|
+----------+----------+-----------+-------+-----------+----+-------+----+----------+-----------------+---------+
|      null|      null|       null|   null|       null|   2| Aditya|  25|         0|               NA|    Delhi|
|      null|      null|       null|   null|       null|   4|Isabell|  10|         0|               NA|  Chennai|
|      null|      null|       null|   null|       null|   7| Samuel|  35|         0|               NA|  Kolkata|
|      null|      null|       null|   null|       null|   8| Daniel|  18|         0|               NA|    Delhi|
|       101|     Watch|    Fashion|  299.0|      Delhi|   1| Olivia|  20|       101|            Watch|   Mumbai|
|       102|       Bag|    Fashion| 1350.0|     Mumbai|null|   null|null|      null|            

## Left Outer Join

Left a.k.a Leftouter join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found. 

In [37]:
leftouterDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "left")
leftouterDF.show()

+----------+----------+-----------+-------+-----------+----+-------+----+----------+-----------------+---------+
|product_id|      name|   category|  price|seller_city|  id|   name| age|product_id|purchased_product|     city|
+----------+----------+-----------+-------+-----------+----+-------+----+----------+-----------------+---------+
|       101|     Watch|    Fashion|  299.0|      Delhi|   1| Olivia|  20|       101|            Watch|   Mumbai|
|       102|       Bag|    Fashion| 1350.0|     Mumbai|null|   null|null|      null|             null|     null|
|       103|     Shoes|    Fashion| 2999.0|    Chennai|   5|Dominic|  30|       103|            Shoes|  Chennai|
|       104|Smartphone|Electronics|14999.0|    Kolkata|   6|  Tyler|  65|       104|       Smartphone|    Delhi|
|       105|     Books|      Study|  145.0|      Delhi|null|   null|null|      null|             null|     null|
|       106|       Oil|    Grocery|  110.0|    Chennai|   3|   Cory|  15|       106|            

## Right Outer Join

Right a.k.a Rightouter join is opposite of left join, here it returns all rows from the right dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found. 

In [38]:
rightouterDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "right")
rightouterDF.show()

+----------+----------+-----------+-------+-----------+---+-------+---+----------+-----------------+---------+
|product_id|      name|   category|  price|seller_city| id|   name|age|product_id|purchased_product|     city|
+----------+----------+-----------+-------+-----------+---+-------+---+----------+-----------------+---------+
|       101|     Watch|    Fashion|  299.0|      Delhi|  1| Olivia| 20|       101|            Watch|   Mumbai|
|      null|      null|       null|   null|       null|  2| Aditya| 25|         0|               NA|    Delhi|
|       106|       Oil|    Grocery|  110.0|    Chennai|  3|   Cory| 15|       106|              Oil|Bengalore|
|      null|      null|       null|   null|       null|  4|Isabell| 10|         0|               NA|  Chennai|
|       103|     Shoes|    Fashion| 2999.0|    Chennai|  5|Dominic| 30|       103|            Shoes|  Chennai|
|       104|Smartphone|Electronics|14999.0|    Kolkata|  6|  Tyler| 65|       104|       Smartphone|    Delhi|
|

## Left Semi Join 

leftsemi join is similar to inner join difference being leftsemi join returns all columns from the left dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets. 

In [39]:
leftsemiDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "leftsemi")
leftsemiDF.show()

+----------+----------+-----------+-------+-----------+
|product_id|      name|   category|  price|seller_city|
+----------+----------+-----------+-------+-----------+
|       101|     Watch|    Fashion|  299.0|      Delhi|
|       103|     Shoes|    Fashion| 2999.0|    Chennai|
|       104|Smartphone|Electronics|14999.0|    Kolkata|
|       106|       Oil|    Grocery|  110.0|    Chennai|
|       107|    Laptop|Electronics|79999.0|  Bengalore|
+----------+----------+-----------+-------+-----------+



## Left Anti Join

leftanti join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records.

In [40]:
leftantiDF = productDF.join(customerDF, productDF.product_id == customerDF.product_id, "leftanti")
leftantiDF.show()

+----------+-----+--------+------+-----------+
|product_id| name|category| price|seller_city|
+----------+-----+--------+------+-----------+
|       102|  Bag| Fashion|1350.0|     Mumbai|
|       105|Books|   Study| 145.0|      Delhi|
+----------+-----+--------+------+-----------+



# PySpark Window Functions

PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row.

| Window Functions Usage & Syntax | PySpark Window Functions description |
| --- | --- |
| row_number(): Column | Returns a sequential number starting from 1 within a window partition |
| rank(): Column | Returns the rank of rows within a window partition, with gaps. |
| percent_rank(): Column | Returns the percentile rank of rows within a window partition. |
| dense_rank(): Column | Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps. |
| ntile(n: Int): Column | Returns the ntile id in a window partition |
| cume_dist(): Column | Returns the cumulative distribution of values within a window partition |
| lag(columnName: String, offset: Int, defaultValue: Any): Column | returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row. |
| lead(columnName: String, offset: Int, defaultValue: Any): Column | returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row. |

In [41]:
from pyspark.sql.window import Window

## row_number

row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [42]:
windowSpec  = Window.partitionBy("industry").orderBy("open")

data.withColumn("row_number",row_number().over(windowSpec)) \
    .select("industry", "sector", "open", "row_number").show(truncate=False)

+----------------------+----------------+----+----------+
|industry              |sector          |open|row_number|
+----------------------+----------------+----+----------+
|Agricultural Chemicals|Basic Industries|0.63|1         |
|Agricultural Chemicals|Basic Industries|0.64|2         |
|Agricultural Chemicals|Basic Industries|0.65|3         |
|Agricultural Chemicals|Basic Industries|0.67|4         |
|Agricultural Chemicals|Basic Industries|0.67|5         |
|Agricultural Chemicals|Basic Industries|0.67|6         |
|Agricultural Chemicals|Basic Industries|0.68|7         |
|Agricultural Chemicals|Basic Industries|0.69|8         |
|Agricultural Chemicals|Basic Industries|0.69|9         |
|Agricultural Chemicals|Basic Industries|0.7 |10        |
|Agricultural Chemicals|Basic Industries|0.7 |11        |
|Agricultural Chemicals|Basic Industries|0.7 |12        |
|Agricultural Chemicals|Basic Industries|0.7 |13        |
|Agricultural Chemicals|Basic Industries|0.7 |14        |
|Agricultural 

## rank

rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [43]:
data.withColumn("rank",rank().over(windowSpec)) \
    .select("industry", "sector", "open", "rank").show(truncate=False)

+----------------------+----------------+----+----+
|industry              |sector          |open|rank|
+----------------------+----------------+----+----+
|Agricultural Chemicals|Basic Industries|0.63|1   |
|Agricultural Chemicals|Basic Industries|0.64|2   |
|Agricultural Chemicals|Basic Industries|0.65|3   |
|Agricultural Chemicals|Basic Industries|0.67|4   |
|Agricultural Chemicals|Basic Industries|0.67|4   |
|Agricultural Chemicals|Basic Industries|0.67|4   |
|Agricultural Chemicals|Basic Industries|0.68|7   |
|Agricultural Chemicals|Basic Industries|0.69|8   |
|Agricultural Chemicals|Basic Industries|0.69|8   |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.7 |10  |
|Agricultural Chemicals|Basic Industries|0.71|16  |
|Agricultura

## dense_rank

dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. *This is similar to rank() function difference being rank function leaves gaps in rank when there are ties*.

In [44]:
data.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .select("industry", "sector", "open", "dense_rank").show(truncate=False)

+----------------------+----------------+----+----------+
|industry              |sector          |open|dense_rank|
+----------------------+----------------+----+----------+
|Agricultural Chemicals|Basic Industries|0.63|1         |
|Agricultural Chemicals|Basic Industries|0.64|2         |
|Agricultural Chemicals|Basic Industries|0.65|3         |
|Agricultural Chemicals|Basic Industries|0.67|4         |
|Agricultural Chemicals|Basic Industries|0.67|4         |
|Agricultural Chemicals|Basic Industries|0.67|4         |
|Agricultural Chemicals|Basic Industries|0.68|5         |
|Agricultural Chemicals|Basic Industries|0.69|6         |
|Agricultural Chemicals|Basic Industries|0.69|6         |
|Agricultural Chemicals|Basic Industries|0.7 |7         |
|Agricultural Chemicals|Basic Industries|0.7 |7         |
|Agricultural Chemicals|Basic Industries|0.7 |7         |
|Agricultural Chemicals|Basic Industries|0.7 |7         |
|Agricultural Chemicals|Basic Industries|0.7 |7         |
|Agricultural 

# PySpark SQL API

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.

## Registering DataFrames as Views

To run SQL queries on Dataframe, **one has to register the dataframe as Temporary view**. This can be done as,

In [45]:
data.createOrReplaceTempView("stocks")

To list views that are registred so far, use `spark.catalog.listTables()`

In [46]:
spark.catalog.listTables()

[Table(name='stocks', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

## Running SQL Queries

Once the View is registered, you can run any SQL query using the `spark.sql(query)` method.

In [47]:
select_star = spark.sql("SELECT * FROM stocks")
select_star.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

## Where clause

In [48]:
df = spark.sql("""SELECT sector, industry, open, close FROM stocks where exchange = 'NASDAQ' and open > 50""")
df.show(5)

+-------------+--------------------+---------+---------+
|       sector|            industry|     open|    close|
+-------------+--------------------+---------+---------+
|Capital Goods|Biotechnology: La...|     54.0|    52.75|
|Capital Goods|Biotechnology: La...|    52.75|    52.27|
|Capital Goods|Biotechnology: La...|52.450001|55.200001|
|Capital Goods|Biotechnology: La...|56.209999|56.779999|
|Capital Goods|Biotechnology: La...|56.849998|     62.0|
+-------------+--------------------+---------+---------+
only showing top 5 rows



## Group By

In [49]:
df = spark.sql("""SELECT sector, avg(open), max(close) FROM stocks group by 1""")
df.show(5)

+--------------------+------------------+-----------+
|              sector|         avg(open)| max(close)|
+--------------------+------------------+-----------+
|       Miscellaneous| 52.03839496900667|1035.829956|
|         Health Care|119.96763306523248|   187000.0|
|    Public Utilities|  35.5807773523944| 282.220001|
|              Energy| 24.45658989126103| 901.039978|
|Consumer Non-Dura...| 43.32860274612681| 664.130005|
+--------------------+------------------+-----------+
only showing top 5 rows



# Writing Dataframe

Once the data has been processed you can write the final dataframe into various data formats using the generic `DataFrame.write.format(format).save(filename)` method.

## Saving in CSV format

Saving data only for `Health Care` sector in csv format.

In [50]:
health.write.format('csv').save('health')

## Formats Supported

- Parquet Files
- ORC Files
- JSON Files
- CSV Files
- Text Files
- Hive Tables
- JDBC To Other Databases
- Avro Files and Many More

## Partitioning Large Files

PySpark partition is a way to split a large dataset into smaller datasets based on one or more partition keys. When you create a DataFrame from a file/table, based on certain parameters PySpark creates the DataFrame with a certain number of partitions in memory. This is one of the main advantages of PySpark DataFrame over Pandas DataFrame. Transformations on partitioned data run faster as they execute transformations parallelly for each partition.

In [51]:
health.write.option("header",True) \
        .partitionBy("industry") \
        .csv("industry_health_data")

This will create seperate folders for each industry in which only data related to that industry will be stored. The ouput of the above cell will look like,

![industry wise output](capture.PNG)

# References and Resources

- [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html)
- [Spark by Example](https://sparkbyexamples.com/pyspark/pyspark-partitionby-example/)
- [Spark Core Concepts](https://luminousmen.com/post/spark-core-concepts-explained)
- [PySpark by Databricks](https://databricks.com/glossary/pyspark)
- [PySpark Cheat Sheet by AWS](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf)