In [0]:
########## Run this code snippet when running for the first time and don't repeat it in future (else it will keep on downloading the same stuffs again and again and
########## result in redundant usage of memory)

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apachemirror.wuchna.com/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
import findspark
import pandas as pd
import random
import numpy as np
os.environ["JAVA_HOME"]   = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]  = "/content/spark-2.4.4-bin-hadoop2.7"
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark                      = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
pathname = '/content/sample_data.csv'
df = spark.read.csv(pathname,header=True,inferSchema=True)
df = df.dropna(how='all')
df.show(3)

+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
| Loan_ID|Gender|Married|Dependents|Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|
+--------+------+-------+----------+---------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+
|LP001002|  Male|     No|         0| Graduate|           No|           5849|              0.0|      null|             360|             1|        Urban|          Y|
|LP001003|  Male|    Yes|         1| Graduate|           No|           4583|           1508.0|       128|             360|             1|        Rural|          N|
|LP001005|  Male|    Yes|         0| Graduate|          Yes|           3000|              0.0|        66|             360|             1|        Urban|          Y|
+--------+------

## Some common aggregate functions

In [0]:
df.select(sum(df['ApplicantIncome'])).show()
df.select(max(df['ApplicantIncome'])).show()
df.select(count(df['Property_Area'])).show()
df.select(mean(df['ApplicantIncome'])).show()
df.select(sumDistinct(df['ApplicantIncome'])).show()
df.select(first(df['ApplicantIncome'])).show()
df.select(last(df['ApplicantIncome'])).show()
df.select(countDistinct(df['Property_Area'])).show()

+--------------------+
|sum(ApplicantIncome)|
+--------------------+
|             3317724|
+--------------------+

+--------------------+
|max(ApplicantIncome)|
+--------------------+
|               81000|
+--------------------+

+--------------------+
|count(Property_Area)|
+--------------------+
|                 614|
+--------------------+

+--------------------+
|avg(ApplicantIncome)|
+--------------------+
|   5403.459283387622|
+--------------------+

+-----------------------------+
|sum(DISTINCT ApplicantIncome)|
+-----------------------------+
|                      2859142|
+-----------------------------+

+-----------------------------+
|first(ApplicantIncome, false)|
+-----------------------------+
|                         5849|
+-----------------------------+

+----------------------------+
|last(ApplicantIncome, false)|
+----------------------------+
|                        4583|
+----------------------------+

+-----------------------------+
|count(DISTINCT Property_A

## Compute the lag of a time series using windows 

In [0]:
data1   = [(1,15),
           (2,20),
           (5,66),
           (3,36),
           (4,45)]
df11 = spark.createDataFrame(data1,("timeindex","value"))
df11.show()
my_window = Window.partitionBy().orderBy("timeindex")
df11 = df11.withColumn("prev_value", lag(df11['value']).over(my_window))
df11.show()
df11 = df11.withColumn("diff", when(isnull(df11['value'] - df11['prev_value']),'null')
                              .otherwise(df11['value'] - df11['prev_value']))
df11.show()

+---------+-----+
|timeindex|value|
+---------+-----+
|        1|   15|
|        2|   20|
|        5|   66|
|        3|   36|
|        4|   45|
+---------+-----+

+---------+-----+----------+
|timeindex|value|prev_value|
+---------+-----+----------+
|        1|   15|      null|
|        2|   20|        15|
|        3|   36|        20|
|        4|   45|        36|
|        5|   66|        45|
+---------+-----+----------+

+---------+-----+----------+----+
|timeindex|value|prev_value|diff|
+---------+-----+----------+----+
|        1|   15|      null|null|
|        2|   20|        15|   5|
|        3|   36|        20|  16|
|        4|   45|        36|   9|
|        5|   66|        45|  21|
+---------+-----+----------+----+



## Ranking using windows 

In [0]:
data1   = [('A',1),
           ('B',5),
           ('C',5),
           ('D',3),
           ('E',1)]
df12 = spark.createDataFrame(data1,("person","Score"))
my_window = Window.partitionBy().orderBy("Score")
df12 = df12.withColumn("dense_rank", dense_rank().over(my_window))
df12= df12.withColumn("rank", rank().over(my_window))
df12= df12.withColumn("percent_rank", percent_rank().over(my_window))
df12= df12.withColumn("ntile", ntile(4).over(my_window))
df12 = df12.withColumn('Greatest_rank',greatest(df12['dense_rank'],df12['rank']))
df12 = df12.withColumn('Lowest_rank',least(df12['dense_rank'],df12['rank']))
df12.show()

+------+-----+----------+----+------------+-----+-------------+-----------+
|person|Score|dense_rank|rank|percent_rank|ntile|Greatest_rank|Lowest_rank|
+------+-----+----------+----+------------+-----+-------------+-----------+
|     A|    1|         1|   1|         0.0|    1|            1|          1|
|     E|    1|         1|   1|         0.0|    1|            1|          1|
|     D|    3|         2|   3|         0.5|    2|            3|          2|
|     B|    5|         3|   4|        0.75|    3|            4|          3|
|     C|    5|         3|   4|        0.75|    4|            4|          3|
+------+-----+----------+----+------------+-----+-------------+-----------+



## Collect list using collect_list and collect_set

In [0]:
data1   = [('A',1),('A',3),('A',7),('A',7),
           ('B',5),('B',1),('B',12),('B',3),
           ('C',5),('C',5),
           ('D',3),('D',15),('D',4),
           ('E',1),('E',2),('E',9)]
df13 = spark.createDataFrame(data1,("person","Score"))

In [0]:
df13.groupBy(df13['person']).agg(collect_list('Score')).show()
df13.groupBy(df13['person']).agg(collect_set('Score')).show()

+------+-------------------+
|person|collect_list(Score)|
+------+-------------------+
|     E|          [1, 2, 9]|
|     B|      [5, 1, 12, 3]|
|     D|         [3, 15, 4]|
|     C|             [5, 5]|
|     A|       [1, 3, 7, 7]|
+------+-------------------+

+------+------------------+
|person|collect_set(Score)|
+------+------------------+
|     E|         [9, 1, 2]|
|     B|     [12, 1, 5, 3]|
|     D|        [15, 3, 4]|
|     C|               [5]|
|     A|         [1, 3, 7]|
+------+------------------+



In [0]:
w = Window.partitionBy('person').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df13.withColumn('total_score', sum(df13['Score']).over(w)).show()  

+------+-----+-----------+
|person|Score|total_score|
+------+-----+-----------+
|     E|    1|         12|
|     E|    2|         12|
|     E|    9|         12|
|     B|    5|         21|
|     B|    1|         21|
|     B|   12|         21|
|     B|    3|         21|
|     D|    3|         22|
|     D|   15|         22|
|     D|    4|         22|
|     C|    5|         10|
|     C|    5|         10|
|     A|    1|         18|
|     A|    3|         18|
|     A|    7|         18|
|     A|    7|         18|
+------+-----+-----------+

