In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 52.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=8cf81ecebe849e8add2e3065f769ba33acee1d6c2372672091587bd8f9369a14
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
import pyspark
import pandas as pd
import numpy as np
import time
from datetime import date , timedelta, datetime

In [3]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
#start the spark session

spark=SparkSession.builder.appName('Covid').getOrCreate()

In [5]:
spark

In [49]:
#Read the data

cases = spark.read.load('Case.csv', format="csv",inferSchema=True,header=True)

In [7]:
cases.show(5)

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
only showing top 5 rows



In [8]:
cases.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [9]:
cases.limit(10).toPandas()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
5,1000006,Seoul,Guro-gu,True,Manmin Central Church,41,37.481059,126.894343
6,1000007,Seoul,from other city,True,SMR Newly Planted Churches Group,36,-,-
7,1000008,Seoul,Dongdaemun-gu,True,Dongan Church,17,37.592888,127.056766
8,1000009,Seoul,from other city,True,Coupang Logistics Center,25,-,-
9,1000010,Seoul,Gwanak-gu,True,Wangsung Church,30,37.481735,126.930121


In [12]:
#change the columns name

cases = cases.withColumnRenamed('infection_case','infection_source')

In [13]:
cases.show(5)

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|    infection_source|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
only showing top 5 rows



In [14]:
#Select

cases = cases.select('province','city','infection_source','confirmed')

cases.show(5)

+--------+------------+--------------------+---------+
|province|        city|    infection_source|confirmed|
+--------+------------+--------------------+---------+
|   Seoul|  Yongsan-gu|       Itaewon Clubs|      139|
|   Seoul|   Gwanak-gu|             Richway|      119|
|   Seoul|     Guro-gu| Guro-gu Call Center|       95|
|   Seoul|Yangcheon-gu|Yangcheon Table T...|       43|
|   Seoul|   Dobong-gu|     Day Care Center|       43|
+--------+------------+--------------------+---------+
only showing top 5 rows



In [20]:
#Sorting

cases.sort("confirmed").show()

+-----------------+---------------+--------------------+---------+
|         province|           city|    infection_source|confirmed|
+-----------------+---------------+--------------------+---------+
|          Jeju-do|              -|contact with patient|        0|
|       Gangwon-do|              -|contact with patient|        0|
|            Seoul|     Gangseo-gu|SJ Investment Cal...|        0|
|            Busan|from other city|Cheongdo Daenam H...|        1|
|     Jeollabuk-do|from other city|  Shincheonji Church|        1|
|            Seoul|from other city|Anyang Gunpo Past...|        1|
|            Seoul|     Gangnam-gu|Gangnam Dongin Ch...|        1|
|           Sejong|from other city|  Shincheonji Church|        1|
|     Jeollanam-do|from other city|  Shincheonji Church|        1|
|          Jeju-do|from other city|       Itaewon Clubs|        1|
|            Seoul|from other city|Daejeon door-to-d...|        1|
|            Seoul|              -|         Orange Life|      

In [17]:
#Descending sort

from pyspark.sql import functions as F

cases.sort(F.desc('confirmed')).show()

+-----------------+---------------+--------------------+---------+
|         province|           city|    infection_source|confirmed|
+-----------------+---------------+--------------------+---------+
|            Daegu|         Nam-gu|  Shincheonji Church|     4511|
|            Daegu|              -|contact with patient|      917|
|            Daegu|              -|                 etc|      747|
| Gyeongsangbuk-do|from other city|  Shincheonji Church|      566|
|      Gyeonggi-do|              -|     overseas inflow|      305|
|            Seoul|              -|     overseas inflow|      298|
|            Daegu|   Dalseong-gun|Second Mi-Ju Hosp...|      196|
| Gyeongsangbuk-do|              -|contact with patient|      190|
|            Seoul|              -|contact with patient|      162|
|            Seoul|     Yongsan-gu|       Itaewon Clubs|      139|
| Gyeongsangbuk-do|              -|                 etc|      133|
|            Daegu|         Seo-gu|Hansarang Convale...|      

In [24]:
#Column type

cases =  cases.withColumn('confirmed', F.col('confirmed').cast(IntegerType()))
cases =  cases.withColumn('city', F.col('city').cast(StringType()))


In [25]:
cases.dtypes

[('province', 'string'),
 ('city', 'string'),
 ('infection_source', 'string'),
 ('confirmed', 'int')]

In [28]:
#filter
# &, |, ~

cases.filter((cases.confirmed>10) & (cases.province=='Daegu')).show()

+--------+------------+--------------------+---------+
|province|        city|    infection_source|confirmed|
+--------+------------+--------------------+---------+
|   Daegu|      Nam-gu|  Shincheonji Church|     4511|
|   Daegu|Dalseong-gun|Second Mi-Ju Hosp...|      196|
|   Daegu|      Seo-gu|Hansarang Convale...|      124|
|   Daegu|Dalseong-gun|Daesil Convalesce...|      101|
|   Daegu|     Dong-gu|     Fatima Hospital|       39|
|   Daegu|           -|     overseas inflow|       41|
|   Daegu|           -|contact with patient|      917|
|   Daegu|           -|                 etc|      747|
+--------+------------+--------------------+---------+



In [30]:
#Groupby

cases.groupby(['province','city']).agg(F.sum('confirmed'),F.max('confirmed')).show()

+----------------+---------------+--------------+--------------+
|        province|           city|sum(confirmed)|max(confirmed)|
+----------------+---------------+--------------+--------------+
|Gyeongsangnam-do|       Jinju-si|             9|             9|
|           Seoul|        Guro-gu|           139|            95|
|           Seoul|     Gangnam-gu|            18|             7|
|         Daejeon|              -|           100|            55|
|    Jeollabuk-do|from other city|             6|             3|
|Gyeongsangnam-do|Changnyeong-gun|             7|             7|
|           Seoul|              -|           561|           298|
|         Jeju-do|from other city|             1|             1|
|Gyeongsangbuk-do|              -|           345|           190|
|Gyeongsangnam-do|   Geochang-gun|            18|            10|
|Gyeongsangbuk-do|        Gumi-si|            10|            10|
|         Incheon|from other city|           117|            53|
|           Busan|       

In [31]:
cases.groupby(['province','city']).agg(F.sum('confirmed').alias(('TotalConfirmed')),F.max('confirmed').alias('MaxfromOneConfirmedCase')).show()

+----------------+---------------+--------------+-----------------------+
|        province|           city|TotalConfirmed|MaxfromOneConfirmedCase|
+----------------+---------------+--------------+-----------------------+
|Gyeongsangnam-do|       Jinju-si|             9|                      9|
|           Seoul|        Guro-gu|           139|                     95|
|           Seoul|     Gangnam-gu|            18|                      7|
|         Daejeon|              -|           100|                     55|
|    Jeollabuk-do|from other city|             6|                      3|
|Gyeongsangnam-do|Changnyeong-gun|             7|                      7|
|           Seoul|              -|           561|                    298|
|         Jeju-do|from other city|             1|                      1|
|Gyeongsangbuk-do|              -|           345|                    190|
|Gyeongsangnam-do|   Geochang-gun|            18|                     10|
|Gyeongsangbuk-do|        Gumi-si|    

In [32]:
# join

regions = spark.read.load('Region.csv',format='csv',inferSchema=True,header=True)

In [33]:
regions.show()

+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
| code|province|         city| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----+--------+-------------+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|10000|   Seoul|        Seoul|37.566953|126.977977|                    607|               830|              48|         1.44|                   15.38|                5.8|             22739|
|10010|   Seoul|   Gangnam-gu|37.518421|127.047222|                     33|                38|               0|         4.18|                   13.17|                4.3|              3088|
|10020|   Seoul|  Gangdong-gu|37.530492|127.123837

In [34]:
regions.limit(10).toPandas()

Unnamed: 0,code,province,city,latitude,longitude,elementary_school_count,kindergarten_count,university_count,academy_ratio,elderly_population_ratio,elderly_alone_ratio,nursing_home_count
0,10000,Seoul,Seoul,37.566953,126.977977,607,830,48,1.44,15.38,5.8,22739
1,10010,Seoul,Gangnam-gu,37.518421,127.047222,33,38,0,4.18,13.17,4.3,3088
2,10020,Seoul,Gangdong-gu,37.530492,127.123837,27,32,0,1.54,14.55,5.4,1023
3,10030,Seoul,Gangbuk-gu,37.639938,127.025508,14,21,0,0.67,19.49,8.5,628
4,10040,Seoul,Gangseo-gu,37.551166,126.849506,36,56,1,1.17,14.39,5.7,1080
5,10050,Seoul,Gwanak-gu,37.47829,126.951502,22,33,1,0.89,15.12,4.9,909
6,10060,Seoul,Gwangjin-gu,37.538712,127.082366,22,33,3,1.16,13.75,4.8,723
7,10070,Seoul,Guro-gu,37.495632,126.88765,26,34,3,1.0,16.21,5.7,741
8,10080,Seoul,Geumcheon-gu,37.456852,126.895229,18,19,0,0.96,16.15,6.7,475
9,10090,Seoul,Nowon-gu,37.654259,127.056294,42,66,6,1.39,15.4,7.4,952


In [71]:
# Join

cases = cases.join(regions,['province','city'],how='left')

cases.show()

+--------+---------------+--------+-----+--------------------+---------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|province|           city| case_id|group|      infection_case|confirmed| latitude| longitude| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+--------+---------------+--------+-----+--------------------+---------+---------+----------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|   Seoul|     Yongsan-gu| 1000001| true|       Itaewon Clubs|      139|37.538621|126.992652|10210|37.532768|126.990021|                     15|                13|               1|         0.68|                   16.87|    

In [41]:
#SQL with dataframes

cases_table=cases.registerTempTable('cases_table')



In [44]:
newDF = spark.sql("select * from cases_table where confirmed > 100")

In [45]:
newDF.show()

+-----------------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+
|         province|           city|    infection_source|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|
+-----------------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+--------------

In [46]:
#create a New Column

caseswithNewConfirmed = cases.withColumn('NewConfirmed',100+F.col('confirmed'))


caseswithNewConfirmed.show()

+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+------------+
|province|           city|    infection_source|confirmed| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|NewConfirmed|
+--------+---------------+--------------------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+---------------

In [72]:
cases.printSchema()

root
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |--  case_id: integer (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- code: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elementary_school_count: integer (nullable = true)
 |-- kindergarten_count: integer (nullable = true)
 |-- university_count: integer (nullable = true)
 |-- academy_ratio: double (nullable = true)
 |-- elderly_population_ratio: double (nullable = true)
 |-- elderly_alone_ratio: double (nullable = true)
 |-- nursing_home_count: integer (nullable = true)



In [70]:
#Window Functions

In [74]:
timeprovince =spark.read.load("TimeProvince.csv",format="csv",inferSchema=True,header=True)

In [75]:
timeprovince.show()

+-------------------+----+-----------------+---------+--------+--------+
|               date|time|         province|confirmed|released|deceased|
+-------------------+----+-----------------+---------+--------+--------+
|2020-01-20 00:00:00|  16|            Seoul|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Busan|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Daegu|        0|       0|       0|
|2020-01-20 00:00:00|  16|          Incheon|        1|       0|       0|
|2020-01-20 00:00:00|  16|          Gwangju|        0|       0|       0|
|2020-01-20 00:00:00|  16|          Daejeon|        0|       0|       0|
|2020-01-20 00:00:00|  16|            Ulsan|        0|       0|       0|
|2020-01-20 00:00:00|  16|           Sejong|        0|       0|       0|
|2020-01-20 00:00:00|  16|      Gyeonggi-do|        0|       0|       0|
|2020-01-20 00:00:00|  16|       Gangwon-do|        0|       0|       0|
|2020-01-20 00:00:00|  16|Chungcheongbuk-do|       

In [76]:
from pyspark.sql.window import Window

**Ranking**

In [77]:
WindowSpec = Window().partitionBy(['province']).orderBy(F.desc('confirmed'))
cases.withColumn('rank',F.rank().over(WindowSpec)).show()

+-----------------+---------------+--------+-----+--------------------+---------+--------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+----+
|         province|           city| case_id|group|      infection_case|confirmed|latitude|longitude| code| latitude| longitude|elementary_school_count|kindergarten_count|university_count|academy_ratio|elderly_population_ratio|elderly_alone_ratio|nursing_home_count|rank|
+-----------------+---------------+--------+-----+--------------------+---------+--------+---------+-----+---------+----------+-----------------------+------------------+----------------+-------------+------------------------+-------------------+------------------+----+
|            Busan|     Dongnae-gu| 1100001| true|       Onchun Church|       39|35.21628| 129.0771|11060| 35.20506|129.083673|                     22|                31|               0|

In [78]:
cases.printSchema()

root
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |--  case_id: integer (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- code: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elementary_school_count: integer (nullable = true)
 |-- kindergarten_count: integer (nullable = true)
 |-- university_count: integer (nullable = true)
 |-- academy_ratio: double (nullable = true)
 |-- elderly_population_ratio: double (nullable = true)
 |-- elderly_alone_ratio: double (nullable = true)
 |-- nursing_home_count: integer (nullable = true)



In [79]:
timeprovince.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- time: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- released: integer (nullable = true)
 |-- deceased: integer (nullable = true)



In [86]:
WindowSpec = Window().partitionBy(['province']).orderBy('date')
timeprovinceWithLag = timeprovince.withColumn("lag_7",F.lag('confirmed',7).over(WindowSpec))

In [87]:
timeprovinceWithLag.show()

+-------------------+----+--------+---------+--------+--------+-----+
|               date|time|province|confirmed|released|deceased|lag_7|
+-------------------+----+--------+---------+--------+--------+-----+
|2020-01-20 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-21 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-22 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-23 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-24 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-25 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-26 00:00:00|  16|   Busan|        0|       0|       0| null|
|2020-01-27 00:00:00|  16|   Busan|        0|       0|       0|    0|
|2020-01-28 00:00:00|  16|   Busan|        0|       0|       0|    0|
|2020-01-29 00:00:00|  16|   Busan|        0|       0|       0|    0|
|2020-01-30 00:00:00|  16|   Busan|        0|       0|       0|    0|
|2020-01-31 00:00:00

In [88]:
timeprovinceWithLag.filter(timeprovinceWithLag.date>'2020-03-10').show()

+-------------------+----+--------+---------+--------+--------+-----+
|               date|time|province|confirmed|released|deceased|lag_7|
+-------------------+----+--------+---------+--------+--------+-----+
|2020-03-11 00:00:00|   0|   Busan|       98|      21|       0|   92|
|2020-03-12 00:00:00|   0|   Busan|       99|      29|       0|   92|
|2020-03-13 00:00:00|   0|   Busan|      100|      36|       0|   95|
|2020-03-14 00:00:00|   0|   Busan|      103|      40|       0|   96|
|2020-03-15 00:00:00|   0|   Busan|      106|      52|       1|   96|
|2020-03-16 00:00:00|   0|   Busan|      107|      53|       1|   96|
|2020-03-17 00:00:00|   0|   Busan|      107|      54|       1|   96|
|2020-03-18 00:00:00|   0|   Busan|      107|      58|       1|   98|
|2020-03-19 00:00:00|   0|   Busan|      107|      58|       1|   99|
|2020-03-20 00:00:00|   0|   Busan|      108|      60|       1|  100|
|2020-03-21 00:00:00|   0|   Busan|      108|      67|       1|  103|
|2020-03-22 00:00:00

Lead

In [89]:
WindowSpec = Window().partitionBy(['province']).orderBy('date')
timeprovinceWithLead = timeprovince.withColumn("lead_7",F.lead('confirmed',7).over(WindowSpec))

In [92]:
timeprovinceWithLead.filter(timeprovinceWithLead.date>"2020-03-10").show(30)

+-------------------+----+--------+---------+--------+--------+------+
|               date|time|province|confirmed|released|deceased|lead_7|
+-------------------+----+--------+---------+--------+--------+------+
|2020-03-11 00:00:00|   0|   Busan|       98|      21|       0|   107|
|2020-03-12 00:00:00|   0|   Busan|       99|      29|       0|   107|
|2020-03-13 00:00:00|   0|   Busan|      100|      36|       0|   108|
|2020-03-14 00:00:00|   0|   Busan|      103|      40|       0|   108|
|2020-03-15 00:00:00|   0|   Busan|      106|      52|       1|   108|
|2020-03-16 00:00:00|   0|   Busan|      107|      53|       1|   109|
|2020-03-17 00:00:00|   0|   Busan|      107|      54|       1|   111|
|2020-03-18 00:00:00|   0|   Busan|      107|      58|       1|   112|
|2020-03-19 00:00:00|   0|   Busan|      107|      58|       1|   112|
|2020-03-20 00:00:00|   0|   Busan|      108|      60|       1|   113|
|2020-03-21 00:00:00|   0|   Busan|      108|      67|       1|   114|
|2020-

In [93]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("James", "Sales", 3000),    \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]

In [94]:
df = spark.createDataFrame(data =simpleData, schema=columns)

In [95]:
df.printSchema()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



In [97]:
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



In [96]:
from pyspark.sql.functions import row_number

In [101]:
WindowSpec = Window().partitionBy('department').orderBy('salary')

df.withColumn("row_number", row_number().over(WindowSpec)).show(truncate=False)
df.withColumn("rank", rank().over(WindowSpec)).show(truncate=False)

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |2         |
|Robert       |Sales     |4100  |3         |
|Saif         |Sales     |4100  |4         |
|Michael      |Sales     |4600  |5         |
+-------------+----------+------+----------+

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
|James        |Sales     |3000  |1   |
|James        |Sal

In [103]:
from pyspark.sql.functions import dense_rank
WindowSpec = Window().partitionBy('department').orderBy('salary')
df.withColumn("rank", rank().over(WindowSpec)).show(truncate=False)
df.withColumn("dense_Rank", dense_rank().over(WindowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|Maria        |Finance   |3000  |1   |
|Scott        |Finance   |3300  |2   |
|Jen          |Finance   |3900  |3   |
|Kumar        |Marketing |2000  |1   |
|Jeff         |Marketing |3000  |2   |
|James        |Sales     |3000  |1   |
|James        |Sales     |3000  |1   |
|Robert       |Sales     |4100  |3   |
|Saif         |Sales     |4100  |3   |
|Michael      |Sales     |4600  |5   |
+-------------+----------+------+----+

+-------------+----------+------+----------+
|employee_name|department|salary|dense_Rank|
+-------------+----------+------+----------+
|Maria        |Finance   |3000  |1         |
|Scott        |Finance   |3300  |2         |
|Jen          |Finance   |3900  |3         |
|Kumar        |Marketing |2000  |1         |
|Jeff         |Marketing |3000  |2         |
|James        |Sales     |3000  |1         |
|James        |Sales     |3000  |1         |
|Ro

In [104]:
from pyspark.sql.functions import percent_rank
WindowSpec = Window().partitionBy('department').orderBy('salary')
df.withColumn("PErcent_rank", percent_rank().over(WindowSpec)).show(truncate=False)

+-------------+----------+------+------------+
|employee_name|department|salary|PErcent_rank|
+-------------+----------+------+------------+
|Maria        |Finance   |3000  |0.0         |
|Scott        |Finance   |3300  |0.5         |
|Jen          |Finance   |3900  |1.0         |
|Kumar        |Marketing |2000  |0.0         |
|Jeff         |Marketing |3000  |1.0         |
|James        |Sales     |3000  |0.0         |
|James        |Sales     |3000  |0.0         |
|Robert       |Sales     |4100  |0.5         |
|Saif         |Sales     |4100  |0.5         |
|Michael      |Sales     |4600  |1.0         |
+-------------+----------+------+------------+



In [109]:
from pyspark.sql.functions import ntile
WindowSpec = Window().partitionBy('department').orderBy('salary')
df.withColumn("N_tile", ntile(2).over(WindowSpec)).show(truncate=False)

+-------------+----------+------+------+
|employee_name|department|salary|N_tile|
+-------------+----------+------+------+
|Maria        |Finance   |3000  |1     |
|Scott        |Finance   |3300  |1     |
|Jen          |Finance   |3900  |2     |
|Kumar        |Marketing |2000  |1     |
|Jeff         |Marketing |3000  |2     |
|James        |Sales     |3000  |1     |
|James        |Sales     |3000  |1     |
|Robert       |Sales     |4100  |1     |
|Saif         |Sales     |4100  |2     |
|Michael      |Sales     |4600  |2     |
+-------------+----------+------+------+



In [112]:
from pyspark.sql.functions import cume_dist
df.withColumn("PErcent_rank", percent_rank().over(WindowSpec)).show(truncate=False)
df.withColumn("CUME_DIST", cume_dist().over(WindowSpec)).show(truncate=False)

+-------------+----------+------+------------+
|employee_name|department|salary|PErcent_rank|
+-------------+----------+------+------------+
|Maria        |Finance   |3000  |0.0         |
|Scott        |Finance   |3300  |0.5         |
|Jen          |Finance   |3900  |1.0         |
|Kumar        |Marketing |2000  |0.0         |
|Jeff         |Marketing |3000  |1.0         |
|James        |Sales     |3000  |0.0         |
|James        |Sales     |3000  |0.0         |
|Robert       |Sales     |4100  |0.5         |
|Saif         |Sales     |4100  |0.5         |
|Michael      |Sales     |4600  |1.0         |
+-------------+----------+------+------------+

+-------------+----------+------+------------------+
|employee_name|department|salary|CUME_DIST         |
+-------------+----------+------+------------------+
|Maria        |Finance   |3000  |0.3333333333333333|
|Scott        |Finance   |3300  |0.6666666666666666|
|Jen          |Finance   |3900  |1.0               |
|Kumar        |Marketin

In [114]:
df.withColumn("Lag",lag('salary',2).over(WindowSpec)).show(truncate=False)

+-------------+----------+------+----+
|employee_name|department|salary|Lag |
+-------------+----------+------+----+
|Maria        |Finance   |3000  |null|
|Scott        |Finance   |3300  |null|
|Jen          |Finance   |3900  |3000|
|Kumar        |Marketing |2000  |null|
|Jeff         |Marketing |3000  |null|
|James        |Sales     |3000  |null|
|James        |Sales     |3000  |null|
|Robert       |Sales     |4100  |3000|
|Saif         |Sales     |4100  |3000|
|Michael      |Sales     |4600  |4100|
+-------------+----------+------+----+



In [117]:
df.withColumn("Lead",lead('salary',2).over(WindowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|Lead|
+-------------+----------+------+----+
|        Maria|   Finance|  3000|3900|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
|        James|     Sales|  3000|4100|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4600|
|         Saif|     Sales|  4100|null|
|      Michael|     Sales|  4600|null|
+-------------+----------+------+----+



In [127]:
from pyspark.sql.functions import col,avg,sum,min,max,row_number,rank,dense_rank

df.withColumn('row',row_number().over(WindowSpec)) \
  .withColumn('avg',avg(col("salary")).over(WindowSpec)) \
  .withColumn('sum',sum(col('salary')).over(WindowSpec)) \
  .withColumn('min',min(col("salary")).over(WindowSpec)) \
  .withColumn('max',max(col('salary')).over(WindowSpec)) \
  .withColumn('rank',rank().over(WindowSpec)) \
  .withColumn("Lag",lag('salary',2).over(WindowSpec))\
  .withColumn("Lead",lead('salary',2).over(WindowSpec))\
  .withColumn('dense_rank',dense_rank().over(WindowSpec)).show()

+-------------+----------+------+---+------+-----+----+----+----+----+----+----------+
|employee_name|department|salary|row|   avg|  sum| min| max|rank| Lag|Lead|dense_rank|
+-------------+----------+------+---+------+-----+----+----+----+----+----+----------+
|        Maria|   Finance|  3000|  1|3000.0| 3000|3000|3000|   1|null|3900|         1|
|        Scott|   Finance|  3300|  2|3150.0| 6300|3000|3300|   2|null|null|         2|
|          Jen|   Finance|  3900|  3|3400.0|10200|3000|3900|   3|3000|null|         3|
|        Kumar| Marketing|  2000|  1|2000.0| 2000|2000|2000|   1|null|null|         1|
|         Jeff| Marketing|  3000|  2|2500.0| 5000|2000|3000|   2|null|null|         2|
|        James|     Sales|  3000|  1|3000.0| 6000|3000|3000|   1|null|4100|         1|
|        James|     Sales|  3000|  2|3000.0| 6000|3000|3000|   1|null|4100|         1|
|       Robert|     Sales|  4100|  3|3550.0|14200|3000|4100|   3|3000|4600|         2|
|         Saif|     Sales|  4100|  4|3550.0

In [None]:
.