# Data Loading and Processing:

In [None]:
import pandas as pd

# Read data
%time pandas_data = pd.read_csv("gs://databricks-bq-2023/spotify_songs.csv")

#Filtering and aggregation
%time result = pandas_data[pandas_data['tempo'] > 100]['track_popularity'].mean()

CPU times: user 233 ms, sys: 35.3 ms, total: 268 ms
Wall time: 602 ms
CPU times: user 7.86 ms, sys: 0 ns, total: 7.86 ms
Wall time: 7.81 ms


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Read data
# https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2020/2020-01-21/spotify_songs.csv
%time spark_data = spark.read.csv('gs://databricks-bq-2023/spotify_songs.csv', header=True, inferSchema=True)

# Filtering and aggregation
%time result = spark_data.filter(spark_data['tempo'] > 100).groupBy('track_popularity').agg({'tempo': 'mean'}).show()

CPU times: user 15.8 ms, sys: 1.51 ms, total: 17.3 ms
Wall time: 3.5 s
+----------------+------------------+
|track_popularity|        avg(tempo)|
+----------------+------------------+
|              31| 131.3258065843621|
|              85|137.14808771929825|
|              65|134.48608211143699|
|              53|131.67043750000002|
|              78| 132.9451084337349|
|              34| 130.5219695945946|
|              81|133.16793162393162|
|              28| 130.2929683257919|
|              76| 128.7724615384615|
|              27|128.77763184079603|
|              26| 130.8025265700483|
|              44|131.31172144846792|
|              12| 130.0424919354839|
|              91| 141.3078378378378|
|              22|131.40969753086424|
|              93|124.93379487179489|
|              47|135.72169034090913|
|               1| 130.5446576354679|
|              52| 134.2945726495726|
|              13| 131.8744939759036|
+----------------+------------------+
only showing top 

# Data Inspection:

In [None]:
%time pandas_data.head()
%time spark_data.show(5)

CPU times: user 306 µs, sys: 36 µs, total: 342 µs
Wall time: 1.43 ms
+--------------------+--------------------+----------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+
|            track_id|          track_name|    track_artist|track_popularity|      track_album_id|    track_album_name|track_album_release_date|playlist_name|         playlist_id|playlist_genre|playlist_subgenre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|
+--------------------+--------------------+----------------+----------------+--------------------+--------------------+------------------------+-------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+---------

In [None]:
%time pandas_data.info()
%time spark_data.printSchema()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32833 entries, 0 to 32832
Data columns (total 23 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   track_id                  32833 non-null  object 
 1   track_name                32828 non-null  object 
 2   track_artist              32828 non-null  object 
 3   track_popularity          32833 non-null  int64  
 4   track_album_id            32833 non-null  object 
 5   track_album_name          32828 non-null  object 
 6   track_album_release_date  32833 non-null  object 
 7   playlist_name             32833 non-null  object 
 8   playlist_id               32833 non-null  object 
 9   playlist_genre            32833 non-null  object 
 10  playlist_subgenre         32833 non-null  object 
 11  danceability              32833 non-null  float64
 12  energy                    32833 non-null  float64
 13  key                       32833 non-null  int64  
 14  loudne

In [None]:
%time pandas_data.describe()
%time spark_data.describe().show()

CPU times: user 57.9 ms, sys: 2.04 ms, total: 59.9 ms
Wall time: 83.5 ms
+-------+--------------------+--------------------+------------------+------------------+--------------------+------------------------+------------------------+--------------------+---------------+--------------------+------------------+-------------------+--------------------+-----------------+------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+
|summary|            track_id|          track_name|      track_artist|  track_popularity|      track_album_id|        track_album_name|track_album_release_date|       playlist_name|    playlist_id|      playlist_genre| playlist_subgenre|       danceability|              energy|              key|          loudness|                mode|        speechiness|       acousticness|   instrumentalness|           liveness|           valence|             tempo

# Selection and Filtering:

In [None]:
%time pandas_data['danceability']
%time pandas_data[pandas_data['danceability'] > 0.75]

%time spark_data.select('danceability')
%time spark_data.filter(spark_data['danceability'] > 0.75)

CPU times: user 115 µs, sys: 0 ns, total: 115 µs
Wall time: 139 µs
CPU times: user 5.98 ms, sys: 0 ns, total: 5.98 ms
Wall time: 6.57 ms
CPU times: user 2.03 ms, sys: 855 µs, total: 2.89 ms
Wall time: 47.1 ms
CPU times: user 2.07 ms, sys: 247 µs, total: 2.31 ms
Wall time: 15.5 ms


DataFrame[track_id: string, track_name: string, track_artist: string, track_popularity: int, track_album_id: string, track_album_name: string, track_album_release_date: string, playlist_name: string, playlist_id: string, playlist_genre: string, playlist_subgenre: string, danceability: string, energy: string, key: string, loudness: string, mode: string, speechiness: double, acousticness: double, instrumentalness: double, liveness: double, valence: double, tempo: double, duration_ms: double]

# Adding and Dropping Columns:

In [None]:
%time pandas_data['new_column'] = pandas_data['danceability'] * 2
%time pandas_data.drop(columns=['track_id'])

from pyspark.sql.functions import col
%time spark_data.withColumn('new_column', spark_data['danceability'] * 2)
%time spark_data.drop('track_id')

CPU times: user 2.3 ms, sys: 0 ns, total: 2.3 ms
Wall time: 2.65 ms
CPU times: user 6.63 ms, sys: 0 ns, total: 6.63 ms
Wall time: 7.1 ms
CPU times: user 6.31 ms, sys: 0 ns, total: 6.31 ms
Wall time: 48.9 ms
CPU times: user 2.52 ms, sys: 0 ns, total: 2.52 ms
Wall time: 40.8 ms


DataFrame[track_name: string, track_artist: string, track_popularity: int, track_album_id: string, track_album_name: string, track_album_release_date: string, playlist_name: string, playlist_id: string, playlist_genre: string, playlist_subgenre: string, danceability: string, energy: string, key: string, loudness: string, mode: string, speechiness: double, acousticness: double, instrumentalness: double, liveness: double, valence: double, tempo: double, duration_ms: double]

# Grouping and Aggregation:

In [None]:
%time pandas_data.groupby('track_artist').agg({'danceability': 'mean'})
%time spark_data.groupBy('track_artist').agg({'danceability': 'mean'})

CPU times: user 18.1 ms, sys: 1.02 ms, total: 19.2 ms
Wall time: 22.1 ms
CPU times: user 4.56 ms, sys: 0 ns, total: 4.56 ms
Wall time: 113 ms


DataFrame[track_artist: string, avg(danceability): double]

# Joins:

In [None]:
orders = spark.read.format('delta').options(header='true').load('dbfs:/databricks-datasets/tpch/delta-001/orders').toPandas()
# o_custkey
customers = spark.read.format('delta').options(header='true').load('dbfs:/databricks-datasets/tpch/delta-001/customer').toPandas()
# c_custkey



In [None]:
%time pandas_merged = pd.merge(orders, customers, left_on='o_custkey',right_on="c_custkey")

%time spark_merged = orders.join(customers, left_on='o_custkey',right_on="c_custkey")

CPU times: user 27.7 s, sys: 2.68 s, total: 30.4 s
Wall time: 34.4 s


# User Defined Functions 

In [None]:
def to_percentile(x, column):
    percentile = (x - column.min()) / (column.max() - column.min()) * 100
    return percentile

# Assuming pandas_data is your DataFrame
%time pandas_data['tempo_percentile'] = pandas_data['tempo'].apply(lambda x: to_percentile(x, pandas_data['tempo']))

CPU times: user 13.8 s, sys: 66.9 ms, total: 13.9 s
Wall time: 16.1 s


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

# Define the UDF
def to_percentile(x, column):
    return ((x - min(column)) / (max(column) - min(column))) * 100

# Register the UDF
percentile_udf = udf(to_percentile, DoubleType())

# Apply the UDF within a Window function
w = Window.orderBy("tempo")

%time pyspark_data = pyspark_data.withColumn("tempo_percentile", percentile_udf(pyspark_data["tempo"], w))