-
Notifications
You must be signed in to change notification settings - Fork 0
/
ChainedTransformations.scala
164 lines (138 loc) · 5.83 KB
/
ChainedTransformations.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
/**
* This object contains an implicit class that extends the DataFrame class.
* It provides two functionalities to deal with dataframes: numAlbumPopularity and greaterThan6kAlbums.
* numAlbumPopularity computes the total album popularity of each label.
* greaterThan6kAlbums filters the labels whose total album popularity is greater than or equal to 60000.
* To use these functionalities, an object of RichDataFrame can be created using the DataFrame that needs to be transformed.
* The new functionalities can then be called with the DataFrame dot notation.
*/
object AlbumsImplicitClassTest {
implicit class ClassDataFrame(df: DataFrame) {
/**
* Computes the total album popularity of each label.
* @return a DataFrame with two columns: label and summed_album_popularity.
*/
def numAlbumPopularity : DataFrame = {
val numAlbumPopularity = df
.groupBy("label")
.agg(
sum($"album_popularity").as("summed_album_popularity")
)
numAlbumPopularity
}
/**
* Filters the labels whose total album popularity is greater than or equal to 60000.
* @return a DataFrame with all columns and rows that satisfy the condition.
*/
def greaterThan6kAlbums: DataFrame = {
val greaterThan6kAlbums = df.where($"summed_album_popularity" >= 60000.0)
greaterThan6kAlbums
}
}
}
/**
* This object contains a main method that loads data from a CSV file
* and uses a ClassDataFrame object to provide two functionalities to the loaded DataFrame: numAlbumPopularity and greaterThan6kAlbums.
* numAlbumPopularity computes the total album popularity of each label.
* greaterThan6kAlbums filters the labels whose total album popularity is greater than or equal to 60000.
* The result of these two operations is printed to the console using the show method.
*/
import AlbumsImplicitClassTest._
object ImplicitClassMain {
def main(args: Array[String]) = {
println( args(0) + " Your main is running now..")
/**
* Loads the data for the Spotify albums.
* @return a dataframe with the data from the CSV file.
*/
def loadSpotifyAlbum: DataFrame = {
val inputDF = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("/tmp/chanu/dq/spotify/spotify_albums_data_2023.csv")
inputDF
}
val result = loadSpotifyAlbum.numAlbumPopularity.greaterThan6kAlbums
// This is soo cool.. :)
/**
* Prints the resulting DataFrame to the console.
*/
result.show
}
}
// to execute - ImplicitClassMain.main(Array("Hey Chan!"))
/* Now, lets try to approach the same problem using `transform` function */
/**
* This object contains two DataFrame transformers that are designed to work in a chain to retrieve
* album popularity from an input DataFrame
*/
object ChainedTransformationsTest {
/**
* This transformer function performs an aggregation on the input DataFrame by grouping by 'label' and
* computing the sum of 'album_popularity' column and returns a new DataFrame with the result.
*
* @param inputDF The input DataFrame which should contain 'label' and 'album_popularity' columns
* @return A new DataFrame containing the grouped and aggregated data with columns 'label' and
* 'summed_album_popularity' representing the sum of 'album_popularity' grouped by 'label'
*/
def numAlbumPopularity(inputDF : DataFrame): DataFrame = {
val df = inputDF
.groupBy("label")
.agg(
sum($"album_popularity").as("summed_album_popularity")
)
df
}
/**
* This transformer function takes in DataFrame with columns 'label' and 'summed_album_popularity' and
* returns a new filtered DataFrame containing only those rows with 'summed_album_popularity' >= 60000.0
*
* @param inputDF The input DataFrame which should contain 'label' and 'summed_album_popularity' columns
* @return A new filtered DataFrame containing only those rows with 'summed_album_popularity' >= 60000.0
*/
def greaterThan6kAlbums(inputDF : DataFrame): DataFrame = {
val df = inputDF.where($"summed_album_popularity" >= 60000.0)
df
}
}
/**
* Object ChainedTransformationMain runs the main method.
* It reads a CSV file containing data about Spotify album by calling the 'loadSpotifyAlbum' method.
* It then applies two transformations, 'numAlbumPopularity' and 'greaterThan6kAlbums' in that order on the input data.
* Finally, it displays the resulting DataFrame.
**/
import ChainedTransformationsTest._
object ChainedTransformationMain {
def main(args: Array[String]) = {
println( args(0) + " Your main is running now..")
/**
* loads a csv file containing attributes for Spotify albums
**/
def loadSpotifyAlbum: DataFrame = {
val inputDF = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("/tmp/chanu/dq/spotify/spotify_albums_data_2023.csv")
inputDF
}
/**
* applies two transformations to the input DataFrame.
* The first transformation aggregates the album_popularity within each label group.
* The second transformation filters all labeled groups whose summed album_popularity is less than 60000
**/
val result = loadSpotifyAlbum
.transform(numAlbumPopularity)
.transform(greaterThan6kAlbums)
/**
* displays the resulting DataFrame.
**/
result.show
}
}
// to execute this - ChainedTransformationMain.main(Array("Hey You!"))