###SILVER LAYER SCRIPT

## DATA ACESS

In [0]:

spark.conf.set("fs.azure.account.auth.type.datalakeproject2.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.datalakeproject2.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.datalakeproject2.dfs.core.windows.net", "c980c0a0-9681-4782-97d4-abfb355f3304")
spark.conf.set("fs.azure.account.oauth2.client.secret.datalakeproject2.dfs.core.windows.net", ".GN8Q~-Ryk4clQo3hMNQsO9QhkMqsypCzJL-yaml")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.datalakeproject2.dfs.core.windows.net", "https://login.microsoftonline.com/c0454d4c-c65b-42f2-b3e5-8a66af788af7/oauth2/token")

## DATA LOADING

In [0]:
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('abfss://bronze@datalakeproject2.dfs.core.windows.net/csv/raw/')



## DATA CLEANING

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, try_to_date
from pyspark.sql.functions import col, try_to_date, trim, expr


df_silver = df_bronze.selectExpr(
    "title",
    "try_cast(rank as int) as rank",
    "try_to_date(trim(date), 'yyyy-MM-dd') as date",
    "artist",
    "url",
    "region",
    "chart",
    "trend",
    "try_cast(streams as bigint) as streams"
)


## REMOVING CORRUPTED ROWS

In [0]:


df_silver = df_silver.filter(
    col("rank").isNotNull() &
    col("date").isNotNull() &
    col("streams").isNotNull()
)


In [0]:
display(df_silver)


title,rank,date,artist,url,region,chart,trend,streams
Chantaje (feat. Maluma),1,2017-01-01,Shakira,https://open.spotify.com/track/6mICuAdrwEjh6Y6lroV2Kg,Argentina,top200,SAME_POSITION,253019
Vente Pa' Ca (feat. Maluma),2,2017-01-01,Ricky Martin,https://open.spotify.com/track/7DM4BPaS7uofFul3ywMe46,Argentina,top200,MOVE_UP,223988
Reggaetón Lento (Bailemos),3,2017-01-01,CNCO,https://open.spotify.com/track/3AEZUABDXNtecAOSC1qTfo,Argentina,top200,MOVE_DOWN,210943
Safari,4,2017-01-01,"J Balvin, Pharrell Williams, BIA, Sky",https://open.spotify.com/track/6rQSrBHf7HlZjtcMZ4S4bO,Argentina,top200,SAME_POSITION,173865
Shaky Shaky,5,2017-01-01,Daddy Yankee,https://open.spotify.com/track/58IL315gMSTD37DOZPJ2hf,Argentina,top200,MOVE_UP,153956
Traicionera,6,2017-01-01,Sebastian Yatra,https://open.spotify.com/track/5J1c3M4EldCfNxXwrwt8mT,Argentina,top200,MOVE_DOWN,151140
Cuando Se Pone a Bailar,7,2017-01-01,Rombai,https://open.spotify.com/track/1MpKZi1zTXpERKwxmOu1PH,Argentina,top200,MOVE_DOWN,148369
Otra vez (feat. J Balvin),8,2017-01-01,Zion & Lennox,https://open.spotify.com/track/3QwBODjSEzelZyVjxPOHdq,Argentina,top200,MOVE_DOWN,143004
La Bicicleta,9,2017-01-01,"Carlos Vives, Shakira",https://open.spotify.com/track/0sXvAOmXgjR2QUqLK1MltU,Argentina,top200,MOVE_UP,126389
Dile Que Tu Me Quieres,10,2017-01-01,Ozuna,https://open.spotify.com/track/20ZAJdsKB5IGbGj4ilRt2o,Argentina,top200,MOVE_DOWN,112012


## COLUMN STANDARDIZATION

In [0]:

df_silver = df_silver.select(
    *[
        col(c).alias(
            c.lower()
             .strip()
             .replace(" ", "_")
             .replace("-", "_")
        )
        for c in df.columns
    ]
)

## TRIM & NORMALIZATION

In [0]:
df_silver = df_silver \
    .withColumn("title", trim(col("title"))) \
    .withColumn("artist", trim(col("artist"))) \
    .withColumn("region", trim(col("region"))) \
    .withColumn("chart", lower(trim(col("chart")))) \
    .withColumn("trend", lower(trim(col("trend"))))

## STANDARDIZE CATEGORICAL VALUES

In [0]:
df_silver = df_silver.withColumn(
    "trend",
    when(col("trend").like("%up%"), "move_up")
    .when(col("trend").like("%down%"), "move_down")
    .otherwise("same")
)

## REMOVE INVALID BUSINESS RECORDS

In [0]:
df_silver = df_silver.where(
    (col("rank") > 0) &
    (col("streams") > 0) &
    (col("date").isNotNull())
)

##DE-DUPLICATION

In [0]:
window = Window.partitionBy("date", "region", "chart", "rank").orderBy(col("streams").desc())

df_silver = (
    df_silver
    .withColumn("rn", row_number().over(window))
    .filter(col("rn") == 1)
    .drop("rn")


)

## WRITE TO SILVER LAYER

In [0]:
silver_path = "abfss://silver@datalakeproject2.dfs.core.windows.net/spotify/charts"


(
    df_silver
    .write
    .format("delta")
    .mode("overwrite")             
    .option("overwriteSchema", "true")
    .save(silver_path)
)

## SAVING AS TABLE

In [0]:
silver_path = "abfss://silver@datalakeproject2.dfs.core.windows.net/spotify/charts"

(
    df_silver
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("spotify_silver")
)


### SQL QUERIES

In [0]:
%sql
SELECT *
FROM spotify_silver
LIMIT 10;



title,rank,date,artist,url,region,chart,trend,streams
I Took A Pill In Ibiza - Seeb Remix,40,2017-01-01,Mike Posner,https://open.spotify.com/track/17Fd6Yb7mSbinKG8LoWfFl,Argentina,top200,move_down,53270
Picky,45,2017-01-01,Joey Montana,https://open.spotify.com/track/4tPL9PeVZY4c0jUPtSD5nx,Argentina,top200,move_up,51113
Nena,60,2017-01-01,Marama,https://open.spotify.com/track/347TStOSY6IOqMB84Rpxwk,Argentina,top200,move_up,41737
Bronceado,99,2017-01-01,Marama,https://open.spotify.com/track/4smbzhaRt7EYwO73SmFXyR,Argentina,top200,move_up,24077
Hoy,126,2017-01-01,Agapornis,https://open.spotify.com/track/11aJTEmPwKKoKRTpsudE1H,Argentina,top200,move_up,19864
Papercuts (feat. Vera Blue),47,2017-01-01,Illy,https://open.spotify.com/track/6vXHaWpqkbuvzIDvR3LnKd,Australia,top200,move_down,35532
Human,75,2017-01-01,Rag'n'Bone Man,https://open.spotify.com/track/58zsLZPvfflaiIbNWoA22O,Australia,top200,move_up,21806
Uptown Funk (feat. Bruno Mars),112,2017-01-01,Mark Ronson,https://open.spotify.com/track/32OlwWuMpZ6b0aN2RZOeMS,Australia,top200,move_down,15767
Can't Hold Us - feat. Ray Dalton,156,2017-01-01,Macklemore & Ryan Lewis,https://open.spotify.com/track/3bidbhpOYeV4knp8AIu8Xn,Australia,top200,same,12141
I Got You,175,2017-01-01,Bebe Rexha,https://open.spotify.com/track/1FUViuNSldssMIawrOXF2i,Australia,top200,same,10653


##Daily Streams by Region

In [0]:
%sql
CREATE OR REPLACE TABLE spotify_gold_daily_region AS
SELECT
  date,
  region,
  SUM(streams) AS total_streams
FROM spotify_silver
GROUP BY date, region;

num_affected_rows,num_inserted_rows


##Top Artists

In [0]:
%sql
CREATE OR REPLACE TABLE spotify_gold_top_artists AS
SELECT
  artist,
  SUM(streams) AS total_streams
FROM spotify_silver
GROUP BY artist
ORDER BY total_streams DESC;


num_affected_rows,num_inserted_rows


## Chart Trend Analysis

In [0]:
%sql
CREATE OR REPLACE TABLE spotify_gold_chart_trends AS
SELECT
  date,
  region,
  trend,
  COUNT(*) AS track_count
FROM spotify_silver
GROUP BY date, region, trend;

num_affected_rows,num_inserted_rows


##Top Tracks

In [0]:
%sql
CREATE OR REPLACE TABLE spotify_gold_top_tracks AS
SELECT
  title,
  artist,
  SUM(streams) AS total_streams
FROM spotify_silver
GROUP BY title, artist
ORDER BY total_streams DESC;

num_affected_rows,num_inserted_rows


In [0]:
%sql SHOW TABLES;


database,tableName,isTemporary
default,spotify_gold_chart_trends,False
default,spotify_gold_daily_region,False
default,spotify_gold_top_artists,False
default,spotify_gold_top_tracks,False
default,spotify_silver,False
,_sqldf,True
