In [42]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types as t

config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [2]:
def create_spark_session():
    spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
    return spark

spark = create_spark_session()

# Process log_data

In [33]:
input_data = config['LOCAL']['LOG_DATA']
output_data = config['LOCAL']['OUTPUT_DATA']
song_data_input = config['LOCAL']['SONG_DATA']

In [5]:
df = spark.read.json(input_data)

In [6]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
df = df.filter(df.page == 'NextSong')

## Process users

In [11]:
df.createOrReplaceTempView("users_table")

In [13]:
%%time
users_table = spark.sql("""
                            SELECT  DISTINCT userId    AS user_id,
                                             firstName AS first_name,
                                             lastName  AS last_name,
                                             gender,
                                             level
                            FROM users_table
                            """)

CPU times: user 1.27 ms, sys: 268 µs, total: 1.54 ms
Wall time: 31.6 ms


In [14]:
%%time
users_path = output_data + "users_table"
users_table.write.mode("overwrite").parquet(users_path)

CPU times: user 3.7 ms, sys: 0 ns, total: 3.7 ms
Wall time: 12.4 s


## Process time table

In [27]:
@udf(t.TimestampType())
def get_timestamp(ts):
    return datetime.fromtimestamp(ts / 1000.0)

@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0)\
        .strftime('%Y-%m-%d %H:%M:%S')

In [28]:
df = df.withColumn('timestamp', get_timestamp('ts'))
df = df.withColumn('datetime', get_datetime('ts'))
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-----

In [23]:
df.select('timestamp').show(5, truncate=False)

+-----------------------+
|timestamp              |
+-----------------------+
|2018-11-15 00:30:26.796|
|2018-11-15 00:41:21.796|
|2018-11-15 00:45:41.796|
|2018-11-15 03:44:09.796|
|2018-11-15 05:48:55.796|
+-----------------------+
only showing top 5 rows



In [29]:
%%time
df.createOrReplaceTempView("time_table")
time_table = spark.sql("""SELECT  DISTINCT datetime AS start_time,
                             hour(timestamp) AS hour,
                             day(timestamp)  AS day,
                             weekofyear(timestamp) AS week,
                             month(timestamp) AS month,
                             year(timestamp) AS year,
                             dayofweek(timestamp) AS weekday
                          FROM time_table""")

CPU times: user 1.25 ms, sys: 248 µs, total: 1.5 ms
Wall time: 54.2 ms


In [30]:
time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 11:35:35|  11| 15|  46|   11|2018|      5|
|2018-11-21 09:36:21|   9| 21|  47|   11|2018|      4|
|2018-11-14 05:11:42|   5| 14|  46|   11|2018|      4|
|2018-11-14 08:10:20|   8| 14|  46|   11|2018|      4|
|2018-11-28 22:24:08|  22| 28|  48|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [32]:
%%time
time_table_path = output_data + "time_table"
time_table.write.mode("overwrite").partitionBy("year", "month")\
            .parquet(time_table_path)

CPU times: user 20.4 ms, sys: 16.6 ms, total: 37 ms
Wall time: 10.1 s


## Process songplays

In [35]:
%%time
song_df = spark.read.json(song_data_input)

CPU times: user 1.96 ms, sys: 420 µs, total: 2.38 ms
Wall time: 1.72 s


In [62]:
song_df.count()

71

In [67]:
%%time
joined = df.join(song_df, (df.artist == song_df.artist_name) & (df.song == song_df.title))

CPU times: user 1.93 ms, sys: 418 µs, total: 2.35 ms
Wall time: 24.9 ms


In [68]:
joined = joined.withColumn("songplay_id", monotonically_increasing_id())

In [69]:
joined.count()

1

In [71]:
joined.createOrReplaceTempView("songplays_table")
songplays_table = spark.sql("""
                                SELECT  songplay_id AS songplay_id,
                                        timestamp   AS start_time,
                                        year(timestamp) AS year,
                                        month(timestamp) AS month, 
                                        userId      AS user_id,
                                        level       AS level,
                                        song_id     AS song_id,
                                        artist_id   AS artist_id,
                                        sessionId   AS session_id,
                                        location    AS location,
                                        userAgent   AS user_agent
                                FROM songplays_table
                                """)

In [72]:
songplays_table.printSchema()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [73]:
%%time
songplays_table_path = output_data + "songplays_table"
songplays_table.write.mode("overwrite").partitionBy("year", "month")\
            .parquet(songplays_table_path)

CPU times: user 2.8 ms, sys: 0 ns, total: 2.8 ms
Wall time: 1.02 s


## All together

In [89]:
def process_log_data(spark, input_data, song_input, output_data):
    # get filepath to log data file
    log_data = input_data

    # read log data file
    print('Start reading logs')
    start = datetime.now()
    df = spark.read.json(log_data)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    df.printSchema()
    print('------------')
    
    # filter by actions for song plays
    df = df.filter(df.page == 'NextSong')
    
    # extract columns for users table   
    print('Start users columns extracting')
    start = datetime.now()
    df.createOrReplaceTempView("users_table")
    users_table = spark.sql("""
        SELECT  DISTINCT userId    AS user_id,
            firstName AS first_name,
            lastName  AS last_name,
            gender,
            level 
        FROM users_table""")
    
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    users_table.printSchema()
    users_table.show(5)
    print('------------')
    
    # write users table to parquet files
    print('Start artists writting')
    start = datetime.now()
    users_path = output_data + "users_table"
    users_table.write.mode("overwrite").parquet(users_path)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    print('------------')

    # create timestamp and datetime columns from original timestamp column
    print('start adding new timestamp and datetime columns')
    start = datetime.now()
    @udf(t.TimestampType())
    def get_timestamp (ts):
        return datetime.fromtimestamp(ts / 1000.0)
    
    @udf(t.StringType())
    def get_datetime(ts):
        return datetime.fromtimestamp(ts / 1000.0)\
                       .strftime('%Y-%m-%d %H:%M:%S')
    
    df = df.withColumn('timestamp', get_timestamp('ts'))
    df = df.withColumn('datetime', get_datetime('ts'))
    
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    df.printSchema()
    print('------------')
    
    # extract columns to create time table
    print('Start time columns extracting')
    start = datetime.now()
    df.createOrReplaceTempView("time_table")
    time_table = spark.sql("""SELECT  DISTINCT datetime AS start_time,
                             hour(timestamp) AS hour,
                             day(timestamp)  AS day,
                             weekofyear(timestamp) AS week,
                             month(timestamp) AS month,
                             year(timestamp) AS year,
                             dayofweek(timestamp) AS weekday
                          FROM time_table""")
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    time_table.printSchema()
    time_table.show(5)
    print('------------')                                         
    
    # write time table to parquet files partitioned by year and month
    print('Start time table writting')
    start = datetime.now()
    time_table_path = output_data + "time_table"
    time_table.write.mode("overwrite").partitionBy("year", "month")\
            .parquet(time_table_path)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    print('------------')   
    
    # read in song data to use for songplays table
    print('Start song data reading')
    start = datetime.now()
    song_df = spark.read.json(song_input)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    print('------------')   

    # extract columns from joined song and log datasets to create songplays table 
    print('Start songplays columns exctraction')
    start = datetime.now()
    joined = df.join(song_df, (df.artist == song_df.artist_name) & (df.song == song_df.title))
    joined = joined.withColumn("songplay_id", monotonically_increasing_id())
    joined.createOrReplaceTempView("songplays_table")
    songplays_table = spark.sql("""
                                SELECT  songplay_id AS songplay_id,
                                        timestamp   AS start_time,
                                        year(timestamp) AS year,
                                        month(timestamp) AS month, 
                                        userId      AS user_id,
                                        level       AS level,
                                        song_id     AS song_id,
                                        artist_id   AS artist_id,
                                        sessionId   AS session_id,
                                        location    AS location,
                                        userAgent   AS user_agent
                                FROM songplays_table
                                """)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    songplays_table.printSchema()
    songplays_table.show(5)
    print('------------')   

    # write songplays table to parquet files partitioned by year and month
    print('Start songplays writting')
    start = datetime.now()
    songplays_table_path = output_data + "songplays_table"
    songplays_table.write.mode("overwrite").partitionBy("year", "month")\
            .parquet(songplays_table_path)
    print('finished:', f"{(datetime.now() - start).total_seconds()} s")
    print('------------')   

In [90]:
process_log_data(spark, input_data, song_data_input, output_data)

Start reading logs
finished: 0.312452 s
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

------------
Start users columns extracting
finished: 0.024321 s
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = tr

AttributeError: 'DataFrame' object has no attribute 'wtime_table'