# Advanced Pyspark SQL 🦸✨

We will explore more advanced functionalities of PySpark SQL.

## What will you learn in this course? 🧐🧐
This lecture will cover some more advanced PySpark SQL, here's the outline:

* Reminder
* Advanced Filters
* Conditionals
    * `.isin()`
    * `.when()`
* Joins
* Array aggregation
    * `F.collect_list()`
    * `F.collect_set()`
    * `F.slice()`
* Window functions
    * `F.rank()`
    * `F.lag()`
    * `F.lead()`
* User defined functions (UDFs)

## Setup 💻💻

In [0]:
# ACCESS_KEY_ID = "AKIA3V3GLDX5SEPP3I7C" # cle du compte student
# SECRET_ACCESS_KEY = "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" # secret key du compte student

# hadoop_conf = spark._jsc.hadoopConfiguration()
# hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
# hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
# hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

songs = spark.read.parquet("s3://full-stack-bigdata-datasets/Big_Data/YOUTUBE/items_selected.parquet")
songs.printSchema()
songs.count(), len(songs.columns)

root
 |-- contentDetails_duration: string (nullable = true)
 |-- id: string (nullable = true)
 |-- snippet_channelId: string (nullable = true)
 |-- snippet_channelTitle: string (nullable = true)
 |-- snippet_publishedAt: string (nullable = true)
 |-- snippet_title: string (nullable = true)
 |-- statistics_commentCount: double (nullable = true)
 |-- statistics_dislikeCount: double (nullable = true)
 |-- statistics_viewCount: long (nullable = true)

Out[1]: (3907, 9)

In [0]:
from pyspark.sql import functions as F

## Reminder 🧠🧠
`isNull()` and `isNotNull()`

In [0]:
from pyspark.sql import functions as F

In [0]:
# We transform the column into a boolean indicating True where we find missing values
# and False otherwise, we then convert this boolean to an integer format transforming
# True into 1 and False into 0, then we calculate the sum.
# The result gives us the number of missing values in the id column.

# Utile pour Steam 
# Compter sur une colonne combien de null dans colonne id
# renvoie colonne de bool à utiliser comme filter
# on fait la somme
songs.select(F.sum(F.col('id').isNull().cast('int')).alias('id')).show()

# Note that we MUST use a column object to apply our method and cannot simply call
# the column by name

+---+
| id|
+---+
|  0|
+---+



Let's apply this to all columns

In [0]:
# La même chose sur toutes les colonnes

def count_missing(col_name):
  return F.sum(F.col(col_name).isNull().cast('int')).alias(col_name)


# voir la comprehension list [count_missing(c) for c in songs.columns]
# elle renvoie une liste
# qu'on unpack avec le *
# songs.select crée un dataframe 
# song.select attend en paramètre le nom d'une colonne ou F.col(id)
# Or... count_missing renvoie une colonne
# après on applique .toPandas
missing_values = songs.select(*[count_missing(c) for c in songs.columns]).toPandas()
missing_values

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount
0,0,0,0,0,0,0,27,14,0


## Advanced Filters ☕☕

Filters can be combined to pass multiple conditions using Python's logical operators (`&`, `|`, `~`) with PySpark's `Column` objects of `BooleanType`.  

---
- `&`: AND operation `TRUE & FALSE => FALSE`  
- `|`: OR operation `TRUE | FALSE => TRUE`
- `~`: NOT operation `~(TRUE) => FALSE`

In [0]:
# We'll first create a boolean indicating a single youtube channel
filter_channelId = (F.col('snippet_channelId') == 'UCudKvbd6gvbm5UCYRk5tZKA')

# We'll create a boolean indicating the rows where dislikeCount is missing
filter_null_dislikeCount = (F.col('statistics_dislikeCount').isNull())

In [0]:
# Let's create a multiple filter which will keep only rows that belong to the 
# specific channel and have a missing value in the dislikeCount column

# En python and s'applique a des bool
# Ici filter_channelId c'est PAS un bool c'est une colonne
# => operateur & au lieu de AND
songs.filter(filter_channelId & filter_null_dislikeCount).count()


# On compte le nb de song de ce channelId et qui ont un dislike null

Out[6]: 0

We will use Python's unary `~` (invert) operator (see [doc](https://docs.python.org/3/reference/expressions.html#unary-arithmetic-and-bitwise-operations)).

In [0]:
# Let's check how many rows that are not in that specific channel have missing values 
# for the disike count column
songs.filter(~filter_channelId & ~filter_null_dislikeCount).count()


# Attention on fait un NOT (A ou B)
# on compte les dislike null ou les autres channels autres channels

Out[7]: 3853

Equivalently we could use another way to perform this filter using: `(NOT(A) and NOT(B)) <=> NOT(A or B)`

In [0]:
songs.filter(~(filter_channelId | filter_null_dislikeCount)).count()

Out[8]: 3853

Warning: do not forget parenthesis.
When using multiple filters make sure you use parenthesis around each condition separated by logical operators

In [0]:
# This example will fail
# because we did not use parethesis and we did not create objects containing the boolean conditions
# beforehand like we did in the previous example
songs \
  .filter(F.col('snippet_channelID') == 'UCudKvbd6gvbm5UCYRk5tZKA' & F.col('statistics_dislikeCount').isNull()) \
  .count()

In [0]:
# This will work because the two conditions are made identifiable thanks to the parenthesis
songs \
  .filter((F.col('snippet_channelID') == 'UCudKvbd6gvbm5UCYRk5tZKA') & (F.col('statistics_dislikeCount').isNull())) \
  .count()

Out[9]: 0

## Conditionals ✔️❌

### `.isin(...)`

If we wish to filter all elements that belong to a list of values, it is possible to do so using the `.isin` method instead of combining several conditions with the or operator.

In [0]:
# Comme dans Pandas
# Ici on sort les 5 channels qui ont le plus de vues

top_channels = songs.groupBy("snippet_channelID").agg(F.sum(F.col("statistics_viewCount")).alias("channel_viewCount"))\
  .orderBy(F.desc(F.col("channel_viewCount"))).limit(5).select("snippet_channelID").toPandas()
top_channels

Unnamed: 0,snippet_channelID
0,UC20vb-R_px4CguHzzBPhoyQ
1,UCQ5kHOKpF3-1_UCKaqXARRg
2,UC1SqP7_RfOC9Jf9L_GRHANg
3,UCFHtCB_FWXQ8GpjgfYcD8-g
4,UCXYRdIXDdeZIf816EWAr5zQ


In [0]:
# on test pour chaque line si les channel id est dans l'ensemble snippet_channelID
#  
songs.filter(F.col('snippet_channelID').isin(top_channels["snippet_channelID"].to_list())).count()

Out[14]: 14

### `.when()`
It is possible to create a variable that has different values according to boolean condition!

In [0]:
songs_filtered = songs \
  .withColumn('ispopularitem', F.when(F.col('snippet_channelID').isin(top_channels["snippet_channelID"].to_list()), 5).otherwise(False))\
.orderBy(F.desc("ispopularitem"))
songs_filtered.limit(20).toPandas()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-20892478379237>:1[0m
[0;32m----> 1[0m songs_filtered [38;5;241m=[39m songs \
[1;32m      2[0m   [38;5;241m.[39mwithColumn([38;5;124m'[39m[38;5;124mispopularitem[39m[38;5;124m'[39m, F[38;5;241m.[39mwhen(F[38;5;241m.[39mcol([38;5;124m'[39m[38;5;124msnippet_channelID[39m[38;5;124m'[39m)[38;5;241m.[39misin(top_channels[[38;5;124m"[39m[38;5;124msnippet_channelID[39m[38;5;124m"[39m][38;5;241m.[39mto_list()), [38;5;241m5[39m)[38;5;241m.[39motherwise([38;5;28;01mFalse[39;00m))\
[1;32m      3[0m [38;5;241m.[39morderBy(F[38;5;241m.[39mdesc([38;5;124m"[39m[38;5;124mispopularitem[39m[38;5;124m"[39m))
[1;32m      4[0m songs_filtered[38;5;241m.[39mlimit([38;5;241m20[39m)[38;5;241m.[39mtoPandas()

File [0;32m/databricks/spark/python/pyspark/instrum

In [0]:
songs_filtered = songs.\
  withColumn('ispopularitem', F.when(F.col('snippet_channelID').isin(top_channels["snippet_channelID"].tolist()), 5).otherwise(0))  # Utilisation de 0 comme valeur par défaut au lieu de False

songs_filtered = songs_filtered.orderBy(F.desc(F.col("ispopularitem")))
songs_filtered.limit(20).toPandas()

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,ispopularitem
0,PT7M4S,2zNSgSzhBfM,UCXYRdIXDdeZIf816EWAr5zQ,Macklemore LLC,2013-04-17T16:00:22Z,MACKLEMORE & RYAN LEWIS - CAN'T HOLD US FEAT. ...,219774.0,137905.0,810998980,5
1,PT3M53S,QK8mJJJvaes,UCXYRdIXDdeZIf816EWAr5zQ,Macklemore LLC,2012-08-29T15:53:50Z,MACKLEMORE & RYAN LEWIS - THRIFT SHOP FEAT. WA...,479401.0,331818.0,1477980957,5
2,PT6M10S,XbGs_qK2PQA,UC20vb-R_px4CguHzzBPhoyQ,EminemVEVO,2013-11-27T16:50:00Z,Eminem - Rap God (Explicit) [Official Video],1057699.0,437215.0,1108231339,5
3,PT4M19S,YxIiPLVR6NA,UC1SqP7_RfOC9Jf9L_GRHANg,AviciiOfficialVEVO,2013-11-01T13:00:05Z,Avicii - Hey Brother (Lyric),155842.0,92330.0,509219309,5
4,PT4M44S,hT_nvWreIhg,UCQ5kHOKpF3-1_UCKaqXARRg,OneRepublicVEVO,2013-05-31T07:00:36Z,OneRepublic - Counting Stars (Official Music V...,389981.0,397557.0,3058853981,5
5,PT4M15S,RBumgq5yVrA,UCFHtCB_FWXQ8GpjgfYcD8-g,Passenger,2012-07-25T22:28:26Z,Passenger | Let Her Go (Official Video),369022.0,353323.0,2789082191,5
6,PT4M33S,IcrbM1l_BoI,UC1SqP7_RfOC9Jf9L_GRHANg,AviciiOfficialVEVO,2013-07-29T15:55:09Z,Avicii - Wake Me Up (Official Video),581275.0,280081.0,1963215194,5
7,PT4M20S,ab9176Srb5Y,UC20vb-R_px4CguHzzBPhoyQ,EminemVEVO,2013-09-09T15:40:00Z,Eminem - Berzerk (Official Music Video) (Expli...,173640.0,66182.0,249341895,5
8,PT4M,Wb5VOQexMBU,UC1SqP7_RfOC9Jf9L_GRHANg,AviciiOfficialVEVO,2013-08-30T13:00:41Z,Avicii - You Make Me,18398.0,10920.0,73376915,5
9,PT4M27S,uelHwf8o7_U,UC20vb-R_px4CguHzzBPhoyQ,EminemVEVO,2010-08-05T19:09:46Z,Eminem - Love The Way You Lie ft. Rihanna,547709.0,336798.0,2064352955,5


## Joins ▶️◀️
Joins let you bring data from several tables into one single table, all you need for this is a common key so that the computer knows which rows may be brought together in the joined table. Let's give an example of this.

Suppose we wish to add to the song table the information about the total number of songs and the total number of views the various channels cumulate, we could do this with joins.

In [0]:
# Let's start by creating the aggregated table
# count(*) compte le nb de ligne par channel
aggregates = songs.groupBy("snippet_channelId").agg(F.sum("statistics_viewCount").alias("totalViews"), F.count("*").alias("totalSongs"))
aggregates.limit(5).toPandas()

Unnamed: 0,snippet_channelId,totalViews,totalSongs
0,UCwNqnXBqaP4yB5S3nnniGrQ,254146,8
1,UCpiZh3AGeTygzfmUgioOFFg,8618,1
2,UC6uf72Eqh6s83_UcUVhWn1Q,51367,1
3,UCTzcJbbC1jIhnOCInyHJzpQ,34740306,1
4,UCb0pvpGeKMRiwqjg2mP5rBA,18769596,1


In [0]:
# Let's now join this table to the original table to have additional informations about the channels
# the syntax works in the following way:
# left_table.join(right_table, left_table_column == right_table_column)

# dans pandas c'est .merge et pas un .join
songs.join(aggregates, songs.snippet_channelId == aggregates.snippet_channelId).limit(5).toPandas()

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,snippet_channelId.1,totalViews,totalSongs
0,PT3M33S,t1l8Z6gLPzo,UCUERSOitwgUq_37kGslN96w,VOLO,2013-07-22T12:09:11Z,"VOLO. ""L'air d'un con""",38,26,223172,UCUERSOitwgUq_37kGslN96w,223172,1
1,PT7M46S,we5gzZq5Avg,UCson549gpvRhPnJ3Whs5onA,LongWayToDream,2012-03-17T08:34:30Z,Julian Jeweil - Air Conditionné,2,3,13409,UCson549gpvRhPnJ3Whs5onA,13409,1
2,PT3M7S,49esza4eiK4,UCcHYZ8Ez4gG_2bHEuBL8IfQ,Downtown Records,2007-09-08T02:02:07Z,Justice - D.A.N.C.E,3168,780,10106655,UCcHYZ8Ez4gG_2bHEuBL8IfQ,28815620,3
3,PT3M43S,BoO6LfR7ca0,UCQ0wLCF7u23gZKJkHFs1Tpg,Music Is Our Drug,2014-01-24T12:52:38Z,Gramatik - Torture (feat. Eric Krasno),6,0,29153,UCQ0wLCF7u23gZKJkHFs1Tpg,29153,1
4,PT5M,DaH4W1rY9us,UCJsTMPZxYD-Q3kEmL4Qijpg,Harvey Pearson,2012-12-02T12:41:13Z,Ben Howard - Oats In The Water,5303,1784,16488714,UCJsTMPZxYD-Q3kEmL4Qijpg,16488714,1


We don't have suffixes by default... In particular, if the joining key column will be duplicated, we can deal with this like that:

In [0]:
# Jointure sur MEME NOM
# Inner par defaut
# on a pas mis de how = left, right, outer...

# voir qu'on a fait un groupby channelId
# on est sûr qu'ils sont là, y a pas de problème

# Note that this only works if the joining key column has the same name in both tables
songs.join(aggregates, 'snippet_channelId').limit(5).toPandas()

Unnamed: 0,snippet_channelId,contentDetails_duration,id,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,totalViews,totalSongs
0,UCUERSOitwgUq_37kGslN96w,PT3M33S,t1l8Z6gLPzo,VOLO,2013-07-22T12:09:11Z,"VOLO. ""L'air d'un con""",38.0,26.0,223172,223172,1
1,UCson549gpvRhPnJ3Whs5onA,PT7M46S,we5gzZq5Avg,LongWayToDream,2012-03-17T08:34:30Z,Julian Jeweil - Air Conditionné,2.0,3.0,13409,13409,1
2,UCcHYZ8Ez4gG_2bHEuBL8IfQ,PT3M7S,49esza4eiK4,Downtown Records,2007-09-08T02:02:07Z,Justice - D.A.N.C.E,3168.0,780.0,10106655,28815620,3
3,UCQ0wLCF7u23gZKJkHFs1Tpg,PT3M43S,BoO6LfR7ca0,Music Is Our Drug,2014-01-24T12:52:38Z,Gramatik - Torture (feat. Eric Krasno),6.0,0.0,29153,29153,1
4,UCJsTMPZxYD-Q3kEmL4Qijpg,PT5M,DaH4W1rY9us,Harvey Pearson,2012-12-02T12:41:13Z,Ben Howard - Oats In The Water,5303.0,1784.0,16488714,16488714,1


## Array aggregation 📙📘📒📗➡📚
It is possible to aggregate columns to form arrays of values.
### `F.collect_list()`

In [0]:
# coolect_list
# inverse de explode
# fonction d'aggregation

# permet de voir qu'on peut faire des opérations sur des list
# ici on fait F.slice()

# Here we'll create a column that contains an array listing all the song titles in each channel.
transactions = songs.groupBy('snippet_channelId').agg(F.collect_list('snippet_title').alias('songs_list'))\
  .withColumn("songCount", F.size("songs_list"))\
  .orderBy(F.desc("songCount"))
transactions.limit(5).toPandas()

Unnamed: 0,snippet_channelId,songs_list,songCount
0,UCudKvbd6gvbm5UCYRk5tZKA,"[Aaron Smith - Dancin (KRONO Remix), Tom Odell...",40
1,UCXIyz409s7bNWVcM-vjfdVA,"[Paradis - La Ballade de Jim, Drew Hill - Soli...",35
2,UC5nc_ZtjKW1htCVZVRxlQAQ,"[Mario M - Let Me Out, Bipolar Sunshine - Rive...",18
3,UCpDJl2EmP7Oh90Vylx0dZtA,"[Martin Garrix - Animals (Radio Edit), Milk & ...",18
4,UCrlNi8Z5TXfWJf4psK76KWw,"[Boards Of Canada - Olson (Midland Edit), Trac...",17


### `F.collect_set()`
Works the same way as collect list, but will result in arrays of distinct elements, as opposed to `.collect_list` which accepts duplicates.

### `F.slice()`
The `.slice()` method let's you select specific elements from an array

In [0]:
# here we'll 
# on garde les 2 premier elements

# on vérifie avec F.size() qu'on a bien 2 lement à chaque fois


transactions \
  .withColumn('items_count', F.size('songs_list')) \
  .withColumn('sliced', F.slice('songs_list', start=1, length=2)) \
  .withColumn('slice_count', F.size('sliced')) \
  .orderBy(F.desc('items_count')) \
  .limit(5).toPandas()

Unnamed: 0,snippet_channelId,songs_list,songCount,items_count,sliced,slice_count
0,UCudKvbd6gvbm5UCYRk5tZKA,"[Aaron Smith - Dancin (KRONO Remix), Tom Odell...",40,40,"[Aaron Smith - Dancin (KRONO Remix), Tom Odell...",2
1,UCXIyz409s7bNWVcM-vjfdVA,"[Paradis - La Ballade de Jim, Drew Hill - Soli...",35,35,"[Paradis - La Ballade de Jim, Drew Hill - Soli...",2
2,UC5nc_ZtjKW1htCVZVRxlQAQ,"[Mario M - Let Me Out, Bipolar Sunshine - Rive...",18,18,"[Mario M - Let Me Out, Bipolar Sunshine - Rivers]",2
3,UCpDJl2EmP7Oh90Vylx0dZtA,"[Martin Garrix - Animals (Radio Edit), Milk & ...",18,18,"[Martin Garrix - Animals (Radio Edit), Milk & ...",2
4,UCrlNi8Z5TXfWJf4psK76KWw,"[Boards Of Canada - Olson (Midland Edit), Trac...",17,17,"[Boards Of Canada - Olson (Midland Edit), Trac...",2


## Window functions 🏢🏢

Window functions make it easy to apply certain functions differently over the data depending on the value of a certain variable. The syntax strongly ressembles that of aggregates.

In [0]:
from pyspark.sql import Window

### `F.rank()`

In [0]:
# We will create a window function that will sort the data in ascending order
# according to statistics view count for each channel id

# definir une fentre en fonction de valeurs d'une colonne
# on part de song
# chaque chanson est à un channel
# dans le channel où elle est rattacé
# on veut faire un ranking des chansosn

# on fait en 2 temps
# definir la fenetre w
# c'est pas un group by, c'est une partition
# on prend un channelid et on trie par veiw count
# ATTENTION le orderBy est ascending par defaut
# Voir .orderBy(F.desc('statistics_viewCount') un peu plus bas
w = Window.partitionBy("snippet_channelId").orderBy('statistics_viewCount')


# then we will use this window function to create a rank variable ranking each song in each channel
# on fait un F.rank qui met un 1 sur la ligne 1 , un 2 sur la ligne 2 etc. Mais sur chaque partition
#  
songs \
  .withColumn('rank', F.rank().over(w)) \
  .orderBy('snippet_channelId','rank') \
  .limit(30).toPandas()

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,rank
0,PT2M28S,zCbGTLlEKsY,UC--yIemFNSgwQ0JxyYsABAQ,Lapalux,2014-02-14T13:58:33Z,LPLX - Lonesum Tnite,30.0,9.0,44054,1
1,PT2M51S,8NHJJtYzz7M,UC-0o65Lf1PjgLMn_SNM_-IA,Planet Mu,2012-06-11T21:25:42Z,Kuedo: Ascension Phase Planet Mu,12.0,1.0,14777,1
2,PT4M6S,QG24u-KULrE,UC-1p1N6c89__rKNkXukVSgQ,dreamlandsessions,2012-03-20T19:27:15Z,We Were Evergreen - Summer Flings - #19 The Dr...,102.0,17.0,277211,1
3,PT4M51S,HvY9nHIAneg,UC-4qGwreIdRc0krq63GVU4A,bollyoldisgold,2011-10-11T06:42:12Z,Chura Liya Hai Tumne Jo Dil Ko (Eng Sub) [Full...,2778.0,7987.0,22094792,1
4,PT7M12S,h0BvWkzMKW4,UC-4scrY6QWlXlcMd0v5STTQ,RECONEYEZmedia,2011-06-13T20:37:23Z,DUB FX 'love someone' | Ghent 25/5 | RECONEYEZ...,170.0,15.0,276692,1
5,PT3M52S,4T9r9_8Pd2s,UC-5PWksZaesFlDIgxDIqSPg,v8ford,2008-01-02T14:51:21Z,Free the Robots Jazzhole,203.0,37.0,653574,1
6,PT6M4S,WYMYEQmBT9M,UC-62qhQ9D1n5PFaZH-s6dGg,1000tters,2011-11-22T21:03:55Z,Connan Mockasin - Ashes to Ashes,8.0,1.0,9881,1
7,PT3M37S,ulIOrQasR18,UC-716wgP94vhil91RVJwaIQ,JonLajoie,2011-07-29T12:05:44Z,F**k Everything (Jon Lajoie),27140.0,5098.0,16425788,1
8,PT3M36S,F9S-88WxPdE,UC-8Q-hLdECwQmaWNwXitYDw,KatyPerryVEVO,2013-09-24T21:00:13Z,Katy Perry - Dark Horse (Audio) ft. Juicy J,38381.0,31191.0,88104088,1
9,PT3M57S,XjwZAa2EjKA,UC-8Q-hLdECwQmaWNwXitYDw,KatyPerryVEVO,2013-11-20T08:03:53Z,Katy Perry - Unconditionally (Official),96260.0,76299.0,545105039,2


In [0]:
# on s'amuse
# le display permet de jouer avec le display
display(songs.groupby('snippet_channelId').count().sort('count', ascending=False))

snippet_channelId,count
UCudKvbd6gvbm5UCYRk5tZKA,40
UCXIyz409s7bNWVcM-vjfdVA,35
UC5nc_ZtjKW1htCVZVRxlQAQ,18
UCpDJl2EmP7Oh90Vylx0dZtA,18
UCrlNi8Z5TXfWJf4psK76KWw,17
UCbTlRjKJcUqDMTlafFLiqyg,17
UCxH0sQJKG6Aq9-vFIPnDZ2A,16
UCR8zOlF04Q6gdZIeMtCJZgQ,12
UC0iwHRFpv2_fpojZgQhElEQ,11
UCXAhoI7XO2kafTMjocm0jCg,11


### `F.lag()` and `F.lead()`
These two methods allow you to create a new column containing the immediately lower or immediatly higher value in a dataframe.

In [0]:
# Utile pour steam




# on veut afficher le nb de vues entre ligne et la suivante mais dans le channelid
# aprecier a difference entre les 2

# on commence par définir ce sur quoi se base les partitions 
w = Window.orderBy('statistics_viewCount')

# on crée une colonne
# on ordonne par nb de vue
# F.lag va chercher la valeur qui est juste en dessous


# A FINIR !!!!!!!!!!!!!!!!!!!!!!!!!!!
songs \
  .withColumn('less_viewed', F.lag('statistics_viewCount').over(w)) \
  .orderBy(F.desc('statistics_viewCount')) \
  .limit(10).toPandas()

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,less_viewed
0,PT4M44S,hT_nvWreIhg,UCQ5kHOKpF3-1_UCKaqXARRg,OneRepublicVEVO,2013-05-31T07:00:36Z,OneRepublic - Counting Stars (Official Music V...,389981.0,397557.0,3058853981,2789082191
1,PT4M15S,RBumgq5yVrA,UCFHtCB_FWXQ8GpjgfYcD8-g,Passenger,2012-07-25T22:28:26Z,Passenger | Let Her Go (Official Video),369022.0,353323.0,2789082191,2064352955
2,PT4M27S,uelHwf8o7_U,UC20vb-R_px4CguHzzBPhoyQ,EminemVEVO,2010-08-05T19:09:46Z,Eminem - Love The Way You Lie ft. Rihanna,547709.0,336798.0,2064352955,1963215194
3,PT4M33S,IcrbM1l_BoI,UC1SqP7_RfOC9Jf9L_GRHANg,AviciiOfficialVEVO,2013-07-29T15:55:09Z,Avicii - Wake Me Up (Official Video),581275.0,280081.0,1963215194,1506602164
4,PT4M4S,8UVNT4wvIGY,UCFC9LamNMmLioW643VZ40OA,gotyemusic,2011-07-05T21:29:29Z,Gotye - Somebody That I Used To Know (feat. Ki...,639673.0,402468.0,1506602164,1477980957
5,PT3M53S,QK8mJJJvaes,UCXYRdIXDdeZIf816EWAr5zQ,Macklemore LLC,2012-08-29T15:53:50Z,MACKLEMORE & RYAN LEWIS - THRIFT SHOP FEAT. WA...,479401.0,331818.0,1477980957,1436168604
6,PT9M17S,8SbUC-UaAxE,UCJN4c_lZorb_0eyIP_tSS3A,GunsNRosesVEVO,2009-12-25T08:37:37Z,Guns N' Roses - November Rain,260028.0,269865.0,1436168604,1425286294
7,PT3M12S,gCYcHz2k5x0,UCpDJl2EmP7Oh90Vylx0dZtA,Spinnin' Records,2013-06-17T14:30:09Z,Martin Garrix - Animals (Official Video),303936.0,311841.0,1425286294,1231470918
8,PT3M35S,hHUbLv4ThOo,UCVWA4btXTFru9qM06FceSag,PitbullVEVO,2013-11-25T18:19:53Z,Pitbull - Timber ft. Ke$ha (Official Video),142086.0,238742.0,1231470918,1214884358
9,PT4M3S,OpQFFLBMEPI,UCXJDX1KK6t121Z9FLhu5o2A,PinkVEVO,2013-02-05T22:00:58Z,P!nk - Just Give Me A Reason ft. Nate Ruess,147319.0,185366.0,1214884358,1206284490


`.lead()` is the opposite

In [0]:
# The column more viewed will carry the view count of the song that was more viewed
songs \
  .withColumn('more_viewed', F.lead('statistics_viewCount').over(w)) \
  .orderBy(F.desc('statistics_viewCount')) \
  .limit(10).toPandas()

Unnamed: 0,contentDetails_duration,id,snippet_channelId,snippet_channelTitle,snippet_publishedAt,snippet_title,statistics_commentCount,statistics_dislikeCount,statistics_viewCount,more_viewed
0,PT4M44S,hT_nvWreIhg,UCQ5kHOKpF3-1_UCKaqXARRg,OneRepublicVEVO,2013-05-31T07:00:36Z,OneRepublic - Counting Stars (Official Music V...,389981,397557,3058853981,
1,PT4M15S,RBumgq5yVrA,UCFHtCB_FWXQ8GpjgfYcD8-g,Passenger,2012-07-25T22:28:26Z,Passenger | Let Her Go (Official Video),369022,353323,2789082191,3058854000.0
2,PT4M27S,uelHwf8o7_U,UC20vb-R_px4CguHzzBPhoyQ,EminemVEVO,2010-08-05T19:09:46Z,Eminem - Love The Way You Lie ft. Rihanna,547709,336798,2064352955,2789082000.0
3,PT4M33S,IcrbM1l_BoI,UC1SqP7_RfOC9Jf9L_GRHANg,AviciiOfficialVEVO,2013-07-29T15:55:09Z,Avicii - Wake Me Up (Official Video),581275,280081,1963215194,2064353000.0
4,PT4M4S,8UVNT4wvIGY,UCFC9LamNMmLioW643VZ40OA,gotyemusic,2011-07-05T21:29:29Z,Gotye - Somebody That I Used To Know (feat. Ki...,639673,402468,1506602164,1963215000.0
5,PT3M53S,QK8mJJJvaes,UCXYRdIXDdeZIf816EWAr5zQ,Macklemore LLC,2012-08-29T15:53:50Z,MACKLEMORE & RYAN LEWIS - THRIFT SHOP FEAT. WA...,479401,331818,1477980957,1506602000.0
6,PT9M17S,8SbUC-UaAxE,UCJN4c_lZorb_0eyIP_tSS3A,GunsNRosesVEVO,2009-12-25T08:37:37Z,Guns N' Roses - November Rain,260028,269865,1436168604,1477981000.0
7,PT3M12S,gCYcHz2k5x0,UCpDJl2EmP7Oh90Vylx0dZtA,Spinnin' Records,2013-06-17T14:30:09Z,Martin Garrix - Animals (Official Video),303936,311841,1425286294,1436169000.0
8,PT3M35S,hHUbLv4ThOo,UCVWA4btXTFru9qM06FceSag,PitbullVEVO,2013-11-25T18:19:53Z,Pitbull - Timber ft. Ke$ha (Official Video),142086,238742,1231470918,1425286000.0
9,PT4M3S,OpQFFLBMEPI,UCXJDX1KK6t121Z9FLhu5o2A,PinkVEVO,2013-02-05T22:00:58Z,P!nk - Just Give Me A Reason ft. Nate Ruess,147319,185366,1214884358,1231471000.0


## PySpark's UDF 🧑‍💻

In Spark SQL we can define our own functions with the UDF function from the `pyspark.sql.functions` module to create our own **U**ser **D**efined **F**unctions (aka UDFs). The default type of the returned variable for UDFs is string. If we would like to return an other type we need to explicitly do so by using the different types from the `pyspark.sql.types` module.

UDF are useful when you really need to use a python function for which you do not find any equivalent in spark. The UDF let's you use the python function while still benefitting from the spark framework!

---
> ⚠️  Using Python User Defined Functions (UDFs) in Apache Spark can have a large negative performance impact.

---

In [0]:
# charge la table
playlog = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3://full-stack-bigdata-datasets/Big_Data/youtube_playlog.csv")
playlog.printSchema()


root
 |-- timestamp: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)



In [0]:

from pyspark.sql.functions import unix_timestamp, from_unixtime
playlog = playlog \
  .withColumn('datetime', from_unixtime('timestamp')) \
  .drop('timestamp') \
  .orderBy('datetime')
import datetime
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, dayofyear, weekofyear
playlog = playlog \
  .withColumn('year', year('datetime')) \
  .withColumn('month', month('datetime')) \
  .withColumn('dayofmonth', dayofmonth('datetime')) \
  .withColumn('dayofyear', dayofyear('datetime')) \
  .withColumn('weekofyear', weekofyear('datetime'))

playlog.printSchema()
playlog.count(), len(playlog.columns)
playlog.limit(5).toPandas()

Unnamed: 0,user,song,datetime,year,month,dayofmonth,dayofyear,weekofyear
0,4,nRa-eGzpT6o,1965-07-26 03:21:43,1965,7,26,207,30
1,0,t1l8Z6gLPzo,2014-02-14 14:18:53,2014,2,14,45,7
2,22,Q24VZL8wpOM,2014-02-14 14:18:57,2014,2,14,45,7
3,70,VJ6ofd0pB_c,2014-02-14 14:18:57,2014,2,14,45,7
4,1,t1l8Z6gLPzo,2014-02-14 14:18:58,2014,2,14,45,7


First, we need a regular Python function, in our case that's a simple function that takes the first three letters from a character string.

In [0]:
# STEP 1
# Equivalent du apply et lambda dans Pandas
# ce que fait la fonction sur un élément de la colonne
def three_first_letters(song):
  return song[0:3]

Seems to work. We will create a UDF and use it.

In [0]:
# STEP 2
# on passe notre fonction
# à un truc qui va la rendre paralelisable

from pyspark.sql.types import StringType

three_first_letters_udf = F.udf(
  three_first_letters, StringType())

In [0]:
# STEP 3
# on passe en parametre la fonction three_first_letters_udf

test = playlog \
  .withColumn('three_first_letters', three_first_letters_udf('song'))
test.printSchema()
test.show(5)

root
 |-- timestamp: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- three_first_letters: string (nullable = true)

+----------+----+-----------+-------------------+
| timestamp|user|       song|three_first_letters|
+----------+----+-----------+-------------------+
|1392387533|   0|t1l8Z6gLPzo|                t1l|
|1392387538|   1|t1l8Z6gLPzo|                t1l|
|1392387556|   2|t1l8Z6gLPzo|                t1l|
|1392387561|   3|we5gzZq5Avg|                we5|
|1392387566|   4|we5gzZq5Avg|                we5|
+----------+----+-----------+-------------------+
only showing top 5 rows



## Ressources 📚📚

Here are some resources to dig further.

- [Introducing Window functions in Spark SQL](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html)
- [Getting started with Spark Part 3: UDFs and Window functions](https://datacenternotes.com/2016/10/03/getting-started-with-spark-part-3-udfs-window-functions/)
- [Using Python aggregate UDFs](https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/)
- [Creating a CDF in PySpark](https://danvatterott.com/blog/2019/08/26/creating-a-cdf-in-pyspark/) 
- [PySpark UDFs](https://docs.databricks.com/spark/latest/spark-sql/udf-python.html)