In [1]:
import configparser
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['EMR']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['EMR']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    """
    When running spark script in command line mode, we need to
    initiate spark session on our own. While, in jupyter notebook,
    it's launched in the background automatically.

    :return: spark session obj
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()

In [5]:
input_data = "s3a://tom-dend-bucket-615"  # s3 bucket of data source
output_data = "s3a://tom-dend-bucket-918"  # s3 bucket of my own bucket

In [6]:
# get filepath to song data file
stage_song_pth = f"{input_data}/song_data/*/*/*/*.json"
# read song data file
df = spark.read.json(stage_song_pth)  # TODO: 確認讀進song_data時，可以用wild-card match
print("Success: Read data from S3.")

# extract columns to create songs table
songs_table = df.select(
    "song_id", "title", "artist_id",
    "year", "duration").dropDuplicates()

Success: Read data from S3.


In [7]:
songs_table.write.parquet(
    path=f"{output_data}/song_table",
    mode="overwrite",
    partitionBy=["year", "artist_id"]
)
print("Success: Wrote song_table to parquet format.")

Success: Wrote song_table to parquet format.


In [8]:
# extract columns to create artists table
artists_table = df.select(
    "artist_id", "artist_name", "artist_location",
    "artist_latitude", "artist_longitude").dropDuplicates()

In [9]:
# write artists table to parquet files
artists_table.write.parquet(
    path=f"{output_data}/artist_table",
    mode="overwrite"
)
print("Success: Wrote artist_table to parquet format.")

Success: Wrote artist_table to parquet format.


In [15]:
# get filepath to log data file
stage_log_data_pth = f"{input_data}/log-data/*.json"

In [16]:
# read log data file
df = spark.read.json(stage_log_data_pth)
print("Success: Read data from S3.")

Success: Read data from S3.


In [17]:
# filter by actions for song plays
# only select actions of "NextSong"
df = df.filter(df["page"] == "NextSong")
print("Success: Filter log_data on NextSong.")

Success: Filter log_data on NextSong.


In [18]:
# extract columns for users table
users_table = df.select(
    "userId", "firstName", "lastName",
    "gender", "level").dropDuplicates()

In [19]:
# write users table to parquet files
users_table.write.parquet(
    path=f"{output_data}/user_table",
    mode="overwrite"
)
print("Success: Wrote user_table to parquet format.")

Success: Wrote user_table to parquet format.


In [20]:
# create timestamp column from original timestamp column
df = df.withColumn("start_time", F.from_unixtime(F.col("ts")/1000))
print("Success: Create column start_time from ts.")

# extract columns to create time table
time_table = df.select("ts", "start_time") \
    .withColumn("hour", F.hour("start_time")) \
    .withColumn("day", F.dayofmonth("start_time")) \
    .withColumn("week", F.weekofyear("start_time")) \
    .withColumn("month", F.month("start_time")) \
    .withColumn("year", F.year("start_time")) \
    .withColumn("weekday", F.dayofweek("start_time")) \
    .dropDuplicates()

Success: Create column start_time from ts.


In [22]:
# extract columns to create time table
time_table = df.select("ts", "start_time") \
    .withColumn("hour", F.hour("start_time")) \
    .withColumn("day", F.dayofmonth("start_time")) \
    .withColumn("week", F.weekofyear("start_time")) \
    .withColumn("month", F.month("start_time")) \
    .withColumn("year", F.year("start_time")) \
    .withColumn("weekday", F.dayofweek("start_time")) \
    .dropDuplicates()

In [23]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(
    path=f"{output_data}/time_table",
    mode="overwrite",
    partitionBy=["year", "month"]
)
print("Success: Wrote time_table to parquet format.")


Success: Wrote time_table to parquet format.


In [24]:
song_table_pth = f"{output_data}/song_table/*/*/*"
song_table = spark.read.parquet(song_table_pth)
print("Success: Read in song_table from partitioned parquet file.")

Success: Read in song_table from partitioned parquet file.


In [25]:
# read in artist_table
artist_table_pth = f"{output_data}/artist_table/*"
artists_table = spark.read.parquet(artist_table_pth)
print("Success: Read in artist_table from non-partitioned parquet file.")

Success: Read in artist_table from non-partitioned parquet file.


In [26]:
# create temp view
song_table.createOrReplaceTempView("song_table")
time_table.createOrReplaceTempView("time_table")
users_table.createOrReplaceTempView("user_table")
artists_table.createOrReplaceTempView("artist_table")
df.createOrReplaceTempView("stage_events_table")

In [30]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [31]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [32]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)



In [35]:
# extract columns from joined song and log data sets to create songplays table
songplays_table = spark.sql("""
    select distinct
        t.start_time,
        t.year,
        t.month,
        u.userId,
        e.level,
        s.song_id,
        a.artist_id,
        e.sessionId,
        e.userAgent
    from
        stage_events_table as e
    inner join time_table t on t.ts = e.ts 
    inner join user_table u on e.userId = u.userId
    inner join song_table s on s.title = e.song
    inner join artist_table a on a.artist_name = e.artist
""")
print("Success: Process sql query.")

Success: Process sql query.


In [36]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(
    path=f"{output_data}/songplay_table",
    mode="overwrite",
    partitionBy=["year", "month"]
)
print("Success: Wrote songplay_table to parquet format.")

Success: Wrote songplay_table to parquet format.
