In [1]:
import configparser
import datetime 
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id
import glob

In [2]:
#mount the blob in Azure storage account
dbutils.fs.mount(
  source = "wasbs://songdata@songdata.blob.core.windows.net",
  mount_point = "/mnt/song-data",
  extra_configs = {"fs.azure.account.key.songdata.blob.core.windows.net":dbutils.secrets.get(scope = "sc_songlog", key = "songblob")})


In [3]:
#Check the mount. Two blob containers are accessable.
%sh ls /dbfs/mnt/song-data

In [4]:
#A function to parse out all the file paths in each container
import re

def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [5]:
#Parse the container and edit each path to comply with DBFS syntax 
song_files=get_files("/dbfs/mnt/song-data/song-data")
song_files = list(map(lambda x: re.sub(r'\/dbfs','dbfs:',x), song_files))
#print(song_files)

In [6]:
#Define a songSchema for songdata
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
songSchema = R([
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_name",Str()),
    Fld("duration",Dbl()),
    Fld("num_songs",Int()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("year",Int()),
])

In [7]:
#Load all the json files 
song_data = spark.read.format('json') \
  .option('schema','songSchema')\
  .load(song_files)


In [8]:
song_data.show(5)

In [9]:
song_data.count()

In [10]:
#Save as Hive table  
table_name = "song_data"
song_data.write.mode('OverWrite').format('parquet').saveAsTable(table_name)

In [11]:
%sh ls /dbfs/user/hive/warehouse

In [12]:
#Create a temp view that has Hive metadata for Spark SQL
song_data.createOrReplaceTempView("song_data")

In [13]:
# extract columns to create songs table
song_table = spark.sql("""
            SELECT distinct song_id, title, artist_id, year, duration
            from song_data
            order by year, artist_id
            """)

In [14]:
# write songs table to parquet files partitioned by year and artist
song_table.write.partitionBy("year","artist_id").parquet("data/output/song_table")

In [15]:
# extract columns to create artists table
artists_table = spark.sql("""
            SELECT distinct artist_id, 
            artist_name as name, 
            artist_location as location, 
            artist_latitude as latitude, 
            artist_longitude as longitude
            from song_data
            """)

In [16]:
# write artists table to parquet files
artists_table.write.parquet("data/output/artists_table")

In [17]:
#Get the paths for log-file
log_files=get_files("/dbfs/mnt/song-data/log-data")
log_files = list(map(lambda x: re.sub(r'\/dbfs','dbfs:',x), log_files))

In [18]:
#Load all the log json files
log_data = spark.read.json(log_files)
log_data.show(10)

In [19]:
log_data.count()

In [20]:
# filter by actions for song plays
df = log_data.filter(log_data.page=="NextSong")
df.count()

In [21]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn("start_time", get_timestamp(df.ts))

In [22]:
df.createOrReplaceTempView("log_data")

In [23]:
# extract columns for users table    
user_table = spark.sql("""
            SELECT distinct int(userId) as user_id, 
            firstName as first_name, 
            lastName as last_name, 
            gender,
            level
            from log_data
            """)

In [24]:
user_table.count()

In [25]:
user_table.write.parquet("data/output/user_table")

In [26]:
# extract columns to create time table
time_table = spark.sql("""
            SELECT distinct start_time,
            hour(start_time) as hour   ,
            dayofmonth(start_time) as day,
            weekofyear(start_time) as week,
            month(start_time) as month,
            year(start_time) as year, 
            dayofweek(start_time) as weekday
            from log_data
            order by year, month
            """)

In [27]:
time_table.show(3)

In [28]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet("data/output/time_table")

In [29]:
#Construct an object to transform the start_time to year and month formats 
import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year

dt = col("start_time").cast("date")
fname = [(year, "year"), (month, "month")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]

In [30]:
songplays_table = spark.sql("""
          select monotonically_increasing_id() as songplay_id , 
                l.start_time, 
                l.userId as user_id, 
                l.level, 
                s.song_id, 
                a.artist_id, 
                l.sessionId as session_id, 
                l.location, 
                l.userAgent as user_agent
           from log_data l
           left join song_data s on l.song=s.title
           left join song_data a on l.artist=a.artist_name
           order by year(l.start_time), month(l.start_time)
""")

In [31]:
#Build the partition tables
songplays_table\
       .select(*exprs)\
       .write\
       .partitionBy(*(name for _, name in fname))\
       .format("parquet")\
       .save("data/output/songplays")

In [32]:
haha=spark.read.parquet("data/output/songplays/year=2018/month=11/*")

In [33]:
haha.show(5)