# <font color='tomato'>Intermediate Guide to PySpark - PySpark SQL Functions with Examples</font>

## Installation

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. 
* **PySpark 3.0.1**
* **pyy4j 0.10.09**

Follow the steps to install the dependencies:

In [None]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Collecting pyspark==3.0.1
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 78kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 24.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=9719bb758d1d81daaa7963650e2030650873eeb3dd55a396bf38df4c38b26daf
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0

Check your installation by creatinf a spark session.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

## Reading Data

### Download Kaggle Movie Dataset

Use the Kaggle API Token(kaggle.json) to download the Movie Dataset

In [None]:
from google.colab import files

## Upload your kaggle json file (API Token)
files.upload()

# !mkdir ~/.kaggle

!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json


In [None]:
!kaggle datasets download -d dinnymathew/usstockprices

Downloading usstockprices.zip to /content
 14% 5.00M/35.5M [00:00<00:00, 48.3MB/s]
100% 35.5M/35.5M [00:00<00:00, 174MB/s] 


In [None]:
!ls

kaggle.json  sample_data  usstockprices.zip


In [None]:
!mkdir data

!unzip usstockprices -d data

Archive:  usstockprices.zip
  inflating: data/stocks_price_final.csv  


In [None]:
!ls -l data/

total 218952
-rw-r--r-- 1 root root 224205096 Jul 24  2020 stocks_price_final.csv


## Import Modules

In [None]:
from pyspark.sql import functions as f

import pandas as pd

import seaborn as sns

import matplotlib.pyplot as plt

%matplotlib inline

## Read Data

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields=data_schema)

In [None]:
data = spark.read.csv(
    'data/stocks_price_final.csv',
    sep = ',',
    header = True,
    schema = final_struc
    )

In [None]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [None]:
data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [None]:
data = data.withColumnRenamed('market.cap', 'market_cap')

## Inspect the data

In [None]:
# prints Schema of thte data
data.schema

StructType(List(StructField(_c0,IntegerType,true),StructField(symbol,StringType,true),StructField(data,DateType,true),StructField(open,DoubleType,true),StructField(high,DoubleType,true),StructField(low,DoubleType,true),StructField(close,DoubleType,true),StructField(volume,IntegerType,true),StructField(adjusted,DoubleType,true),StructField(market_cap,StringType,true),StructField(sector,StringType,true),StructField(industry,StringType,true),StructField(exchange,StringType,true)))

In [None]:
data.dtypes

[('_c0', 'int'),
 ('symbol', 'string'),
 ('data', 'date'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'int'),
 ('adjusted', 'double'),
 ('market_cap', 'string'),
 ('sector', 'string'),
 ('industry', 'string'),
 ('exchange', 'string')]

In [None]:
data.head(3)

[Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market_cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', data=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market_cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', data=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market_cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')]

In [None]:
data.show(5)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market_cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

## Aggregation functions

In [None]:
## Find Average price of opening and closing stock
for i in ['open', 'close']:
  print(f'''Average {i} Stock Price: {
    data.select(avg(i)).collect()[0][0]
    }''')

Average open Stock Price: 15070.071703341051
Average close Stock Price: 15032.714854330707


In [None]:
data.select(sum("volume")).show(truncate=False)

+-------------+
|sum(volume)  |
+-------------+
|2411308303088|
+-------------+



In [None]:
data.select(sumDistinct("volume")).show(truncate=False)

+--------------------+
|sum(DISTINCT volume)|
+--------------------+
|1248299451208       |
+--------------------+



In [None]:
data.select(collect_set("exchange")).show(truncate=False)

+---------------------+
|collect_set(exchange)|
+---------------------+
|[NYSE, NASDAQ]       |
+---------------------+



In [None]:
data.select(count("sector")).show(truncate=False)

+-------------+
|count(sector)|
+-------------+
|1729034      |
+-------------+



In [None]:
data.select(countDistinct("sector")).show(truncate=False)

+----------------------+
|count(DISTINCT sector)|
+----------------------+
|12                    |
+----------------------+



In [None]:
data.select(kurtosis("volume")).show(truncate=False)

+------------------+
|kurtosis(volume)  |
+------------------+
|1159.3465390444946|
+------------------+



In [None]:
data.select(skewness("volume")).show(truncate=False)


+------------------+
|skewness(volume)  |
+------------------+
|22.534251558551144|
+------------------+



In [None]:
data.select(stddev("volume")).show(truncate=False)


+-------------------+
|stddev_samp(volume)|
+-------------------+
|5187522.908169119  |
+-------------------+



In [None]:
data.select(variance("volume")).show(truncate=False)


+--------------------+
|var_samp(volume)    |
+--------------------+
|2.691039392277939E13|
+--------------------+



## Window Functions

In [None]:
from pyspark.sql.window import Window

In [None]:
windowSpec  = Window.partitionBy("sector").orderBy("industry")

data.withColumn("row_number",row_number().over(windowSpec)) \
    .show(10, truncate=False)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
|_c0 |symbol|data      |open |high |low  |close|volume|adjusted|market_cap|sector       |industry         |exchange|row_number|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
|4253|KRKR  |2019-11-08|13.0 |14.5 |12.58|13.06|479100|13.06   |$130.48M  |Miscellaneous|Business Services|NASDAQ  |1         |
|4254|KRKR  |2019-11-11|12.72|12.78|10.7 |10.74|226200|10.74   |$130.48M  |Miscellaneous|Business Services|NASDAQ  |2         |
|4255|KRKR  |2019-11-12|10.33|10.96|8.91 |8.92 |186800|8.92    |$130.48M  |Miscellaneous|Business Services|NASDAQ  |3         |
|4256|KRKR  |2019-11-13|8.88 |9.73 |8.72 |9.21 |117000|9.21    |$130.48M  |Miscellaneous|Business Services|NASDAQ  |4         |
|4257|KRKR  |2019-11-14|9.04 |9.49 |9.04 |9.2  |59700 |9.2     |$130.48M  |Miscellaneous|Business Servic

In [None]:
from pyspark.sql.functions import rank

data.withColumn("rank", rank().over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market_cap|       sector|         industry|exchange|rank|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Services|  NASDAQ|   1|
+----+------+----------+

In [None]:
from pyspark.sql.functions import dense_rank

data.withColumn("dense_rank",dense_rank().over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market_cap|       sector|         industry|exchange|dense_rank|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----------+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ|         1|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ|         1|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ|         1|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|         1|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Servic

In [None]:
from pyspark.sql.functions import ntile

data.withColumn("ntile",ntile(4).over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+-----+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market_cap|       sector|         industry|exchange|ntile|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+-----+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ|    1|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ|    1|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ|    1|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|    1|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Services|  NASDAQ|    1|
+----+------+---

In [None]:
from pyspark.sql.functions import lag    

data.withColumn("lag",lag("adjusted",2).over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+-----+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market_cap|       sector|         industry|exchange|  lag|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+-----+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ| null|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ| null|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ|13.06|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|10.74|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Services|  NASDAQ| 8.92|
+----+------+---

In [None]:
from pyspark.sql.functions import lead    

data.withColumn("lead",lead("adjusted",2).over(windowSpec)).show(5)

+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
| _c0|symbol|      data| open| high|  low|close|volume|adjusted|market_cap|       sector|         industry|exchange|lead|
+----+------+----------+-----+-----+-----+-----+------+--------+----------+-------------+-----------------+--------+----+
|4253|  KRKR|2019-11-08| 13.0| 14.5|12.58|13.06|479100|   13.06|  $130.48M|Miscellaneous|Business Services|  NASDAQ|8.92|
|4254|  KRKR|2019-11-11|12.72|12.78| 10.7|10.74|226200|   10.74|  $130.48M|Miscellaneous|Business Services|  NASDAQ|9.21|
|4255|  KRKR|2019-11-12|10.33|10.96| 8.91| 8.92|186800|    8.92|  $130.48M|Miscellaneous|Business Services|  NASDAQ| 9.2|
|4256|  KRKR|2019-11-13| 8.88| 9.73| 8.72| 9.21|117000|    9.21|  $130.48M|Miscellaneous|Business Services|  NASDAQ|9.98|
|4257|  KRKR|2019-11-14| 9.04| 9.49| 9.04|  9.2| 59700|     9.2|  $130.48M|Miscellaneous|Business Services|  NASDAQ| 9.6|
+----+------+----------+



---

---



