In [1]:
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Transform_in_warehouse') \
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar') \
    .getOrCreate()

In [3]:
bucket = "project_data_lake_final-dtc-project"
spark.conf.set('temporaryGcsBucket', bucket)

#### Read in dataframes

Song feature dataframe

In [4]:
# Make sure integers don't get read in as LongType

feat_schema = types.StructType([
    types.StructField('song_id', types.StringType(), True),
    types.StructField('duration_ms', types.IntegerType(), True),
    types.StructField('key', types.IntegerType(), True),
    types.StructField('mode', types.IntegerType(), True),
    types.StructField('time_signature', types.IntegerType(), True),
    types.StructField('acousticness', types.DoubleType(), True),
    types.StructField('danceability', types.DoubleType(), True),
    types.StructField('energy', types.DoubleType(), True),
    types.StructField('instrumentalness', types.DoubleType(), True),
    types.StructField('liveness', types.DoubleType(), True),
    types.StructField('loudness', types.DoubleType(), True),
    types.StructField('speechiness', types.DoubleType(), True),
    types.StructField('valence', types.DoubleType(), True),
    types.StructField('tempo', types.DoubleType(), True)
])

In [6]:
df_feat = spark.read \
    .format('bigquery') \
    .schema(feat_schema) \
    .option('table', 'final-dtc-project.song_data.song_feature_data') \
    .load()

df_feat.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- key: integer (nullable = true)
 |-- mode: integer (nullable = true)
 |-- time_signature: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)



In [7]:
df_feat.createOrReplaceTempView('df_feat')

Song popularity dataframe

In [8]:
# Again, make sure integers don't get read in as LongType

pop_schema = types.StructType([
    types.StructField('song_id', types.StringType(), True),
    types.StructField('rank_score', types.IntegerType(), True),
    types.StructField('peak_position', types.IntegerType(), True),
    types.StructField('weeks_on_chart', types.IntegerType(), True),
    types.StructField('week', types.DateType(), True)
])

In [9]:
df_pop = spark.read \
    .format('bigquery') \
    .schema(pop_schema) \
    .option('table', 'final-dtc-project.song_data.song_popularity_data') \
    .load()

df_pop.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- rank_score: integer (nullable = true)
 |-- peak_position: integer (nullable = true)
 |-- weeks_on_chart: integer (nullable = true)
 |-- week: date (nullable = true)



In [10]:
df_pop.createOrReplaceTempView('df_pop')

#### Transform data

Spark UDFs

In [11]:
# Map 'key' column and 'mode' column in the song feature dataset
# from integers to more readable values, using info from the dataset READMEs.
# Select only desired columns.

pitches = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
modes = ['Minor','Major']

mapKeyCol = F.udf(lambda x: pitches[x])
mapModeCol = F.udf(lambda x: modes[x])

df_feat = df_feat \
    .withColumn('key', mapKeyCol(df_feat.key)) \
    .withColumn('mode', mapModeCol(df_feat.mode)) \
    .select('song_id','duration_ms',  'key', 'mode', 'time_signature',
            'danceability', 'energy', 'loudness', 'valence', 'tempo')

df_feat.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- key: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- time_signature: integer (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)



In [12]:
df_feat.createOrReplaceTempView('df_feat')

Spark SQL

In [16]:
# Filter the popularity data to only include songs at number 1 on the chart,
# and take only year value from week column

df_pop_tr = spark.sql("""
SELECT song_id, YEAR(week) AS year
FROM df_pop
WHERE rank_score = 1
ORDER BY week
""")

df_pop_tr.show(10)

+--------------------+----+
|             song_id|year|
+--------------------+----+
|1TRvdHDqCIcTQpHTZ...|1964|
|6kR2BCzArduYNuJdt...|1964|
|1FXX4oUbO6eJEszwR...|1964|
|5ysUfRP9UwPZ7Npmh...|1964|
|39xymLsE59O6HtMtq...|1964|
|5cCf88l6a1uXHMSf0...|1964|
|7ssLCxMwgVRAchoKP...|1964|
|7wGbzk3aeGSpnsK2A...|1964|
|0qr77Nfice5jmmGCf...|1964|
|21TQE7gTsbYSDRgit...|1964|
+--------------------+----+
only showing top 10 rows



In [17]:
df_pop_tr.createOrReplaceTempView('df_pop_tr')

In [18]:
# Left-join the list of number 1 songs by week
# with the song features dataset, on song_id column

df_joined = spark.sql("""
SELECT
    f.song_id, duration_ms AS dur_ms, key, mode, time_signature AS time_sign,
    danceability, energy, loudness, valence AS positivity, tempo, year
FROM
    df_pop_tr as p
LEFT JOIN
    df_feat AS f    
WHERE
    f.song_id = p.song_id
""")

df_joined.show(10)
df_joined.printSchema()

+--------------------+------+---+-----+---------+------------+------+--------+-------+-------+----+
|             song_id|dur_ms|key| mode|time_sign|danceability|energy|loudness|valence|  tempo|year|
+--------------------+------+---+-----+---------+------------+------+--------+-------+-------+----+
|1TRvdHDqCIcTQpHTZ...|156800|  C|Major|        4|       0.666|  0.63|  -7.929|  0.615|141.389|1964|
|6kR2BCzArduYNuJdt...|159040|  C|Major|        4|       0.476| 0.525|  -10.82|  0.895|122.023|1964|
|1FXX4oUbO6eJEszwR...|151813|  D|Minor|        5|       0.105| 0.342| -10.903|  0.307| 79.467|1964|
|5ysUfRP9UwPZ7Npmh...|153120|  C|Major|        4|       0.452| 0.951|    3.44|  0.668|127.237|1964|
|39xymLsE59O6HtMtq...|155200| G#|Major|        3|       0.561| 0.484|  -9.678|  0.633|  81.73|1964|
|5cCf88l6a1uXHMSf0...|176300| A#|Major|        4|       0.516| 0.176| -14.337|  0.196| 80.206|1964|
|7ssLCxMwgVRAchoKP...|179978|  F|Major|        3|       0.312| 0.206| -11.915|  0.288|110.597|1964|


#### Write data to BigQuery

In [19]:
df_joined.write.format('bigquery') \
  .option('table', 'song_data.joined_feat_pop') \
  .save()