<h1>Data exploration with spark</h1>
This is just to get taste of how the data looks and what formats it uses.


In [1]:
import uuid
from datetime import datetime
print(f'test {datetime.now()}')

test 2020-06-20 17:11:24.542307


<h2> Imports </h2>

In [2]:
import os
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import col, row_number, last, from_unixtime, year, month, dayofmonth, hour, weekofyear, \
    dayofweek, udf
from pyspark.sql.window import Window

In [3]:
def create_spark_session():
    return SparkSession \
        .builder \
        .getOrCreate()

In [5]:
config = configparser.ConfigParser()
config.read_file(open('../src/dl.cfg'))
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
spark = create_spark_session()

In [6]:
log_df = spark.read.json('../data/log_data')

In [7]:
log_df.cache()
log_df.show()



+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Su

In [12]:
log_df.printSchema()


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
schema = StructType([
    StructField("artist_id", StringType()),
    StructField("artist_latitude", FloatType()),
    StructField("artist_location", StringType()),
    StructField("artist_longitude", FloatType()),
    StructField("artist_name", StringType()),
    StructField("user_agent", StringType()),
    StructField("num_songs", IntegerType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("year", IntegerType())
])
song_df = spark.read.json('../data/song_data/*/*/*/*.json', schema=schema)\
    .dropDuplicates(["song_id"])
# song_df.cache()

In [18]:
song_df.show()
song_df.printSchema()

+------------------+---------------+-----------------+----------------+--------------------+----------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name|user_agent| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+----------+---------+---------+------------------+--------------------+----+
|ARGUVEV1187B98BA17|           null|                 |            null|      Sierra Maestra|      null|313.12933|        1|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|1997|
|ARD7TVE1187B99BFB1|           null|  California - LA|            null|              Casual|      null| 218.9318|        1|SOMZWCG12A8C13C480|    I Didn't Mean To|   0|
|ARJIE2Y1187B994AB7|           null|                 |            null|         Line Renaud|      null|152.92036|        1|SOUPIRU12A6D4FA1E1| Der Kleine D

<h3>Processing song data</h3>

Song table
Requirements:

* song_id, title, artist_id, year, duration

In [17]:
song_table_df = song_df.select(
    song_df.song_id,
    song_df.title,
    song_df.artist_id,
    song_df.year,
    song_df.duration
)
song_table_df.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16364|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48772|
|SONWXQJ12A8C134D94|The Ballad Of Sle...|ARNF6401187FB57032|1994|  305.162|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|ARLTWXK1187FB5A3F8|   0|326.00772|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|ARPFHN61187FB575F6|   0|279.97995|
|SODUJBS12A8C132150|Wessex Loses a Bride|ARI2JSK1187FB496EF|   0|111.62077|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
|SOGXHEG12AB

Artists table
Requirements:

* artist_id, name, location, lattitude, longitude

In [19]:
artists_table_df = song_df.select(
    song_df.artist_id,
    song_df.artist_name.alias('name'),
    song_df.artist_location.alias('location'),
    song_df.artist_latitude.alias('latitude'),
    song_df.artist_longitude.alias('longitude')
)
artists_table_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)



<h2>Logs</h2>
song plays

In [20]:
user_id_by_ts_window = Window.partitionBy(
    col('userId'))\
    .orderBy(col('ts'))
user_id_by_ts_window_ranged = Window.partitionBy(
    col('userId'))\
    .orderBy(col('ts').desc())\
    .rangeBetween(Window.unboundedPreceding, Window.currentRow)
next_song_log_df = log_df.filter(col('page') == 'NextSong')
next_song_log_df = next_song_log_df\
    .withColumn('user_row_num', row_number().over(user_id_by_ts_window))\
    .withColumn('firstName', last('firstName').over(user_id_by_ts_window_ranged))\
    .withColumn('lastName', last('lastName').over(user_id_by_ts_window_ranged))\
    .withColumn('gender', last('gender').over(user_id_by_ts_window_ranged))\
    .withColumn('level', last('level').over(user_id_by_ts_window_ranged))\
    .select('firstName', 'lastName', 'gender', 'level', 'userid','user_row_num')\
    .where(col('user_row_num') == 1)
next_song_log_df.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- user_row_num: integer (nullable = true)



Artist
artist_id, name, location, lattitude, longitude

In [23]:
songs_df = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')\
    .withColumnRenamed('artist_name','name')\
    .withColumnRenamed('artist_location','location')\
    .withColumnRenamed('artist_latitude','latitude')\
    .withColumnRenamed('artist_longitude','longitude')\
    .dropDuplicates(['artist_id'])
songs_df.show()

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|  8.4177|-80.11278|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|30.08615|-94.10158|
|AREDL271187FB40F44|        Soul Mekanik|                    |    null|     null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington| 38.8991|  -77.029|
|AR7SMBG1187B9B9066|         Los Manolos|                    |    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARNTLGG11E2835DDB9|                 Clp|                    |    null|     null|
|ARKRRTF1187B9984DA|    Sonora Santanera|                    |    null|     null|
|AR051KA1187B98B

time start_time, hour, day, week, month, year, weekday

In [8]:
time_df = log_df.withColumn('ts', col('ts')/1000).select(col('ts').cast(IntegerType()))\
    .dropDuplicates().withColumnRenamed('ts','start_time')
time_df = time_df\
    .withColumn('ts_as_datetime', from_unixtime(col('start_time'), 'yyyy-MM-dd HH:mm:ss').cast(TimestampType()))\
    .withColumn('hour', hour(col('ts_as_datetime')))\
    .withColumn('day', dayofmonth(col('ts_as_datetime')))\
    .withColumn('week', weekofyear(col('ts_as_datetime')))\
    .withColumn('month', month(col('ts_as_datetime')))\
    .withColumn('houyear', year(col('ts_as_datetime')))\
    .withColumn('weekday', weekofyear(col('ts_as_datetime')))
time_df.show()
time_df.printSchema()

+----------+-------------------+----+---+----+-----+-------+-------+
|start_time|     ts_as_datetime|hour|day|week|month|houyear|weekday|
+----------+-------------------+----+---+----+-----+-------+-------+
|1542303240|2018-11-16 04:34:00|   4| 16|  46|   11|   2018|     46|
|1542310732|2018-11-16 06:38:52|   6| 16|  46|   11|   2018|     46|
|1542777793|2018-11-21 16:23:13|  16| 21|  47|   11|   2018|     47|
|1542789405|2018-11-21 19:36:45|  19| 21|  47|   11|   2018|     47|
|1542809782|2018-11-22 01:16:22|   1| 22|  47|   11|   2018|     47|
|1542166466|2018-11-14 14:34:26|  14| 14|  46|   11|   2018|     46|
|1542180760|2018-11-14 18:32:40|  18| 14|  46|   11|   2018|     46|
|1542219957|2018-11-15 05:25:57|   5| 15|  46|   11|   2018|     46|
|1542226599|2018-11-15 07:16:39|   7| 15|  46|   11|   2018|     46|
|1543369249|2018-11-28 12:40:49|  12| 28|  48|   11|   2018|     48|
|1543423894|2018-11-29 03:51:34|   3| 29|  48|   11|   2018|     48|
|1541417478|2018-11-05 22:31:18|  

In [14]:
uuidUdf= udf(lambda : str(uuid.uuid4()),StringType())
time_df.withColumn("id",uuidUdf()).show()

+----------+-------------------+----+---+----+-----+-------+-------+--------------------+
|start_time|     ts_as_datetime|hour|day|week|month|houyear|weekday|                  id|
+----------+-------------------+----+---+----+-----+-------+-------+--------------------+
|1542303240|2018-11-16 04:34:00|   4| 16|  46|   11|   2018|     46|5478d126-3a2e-4a7...|
|1542310732|2018-11-16 06:38:52|   6| 16|  46|   11|   2018|     46|9c9d13b6-6f75-490...|
|1542777793|2018-11-21 16:23:13|  16| 21|  47|   11|   2018|     47|63a41073-6c3a-434...|
|1542789405|2018-11-21 19:36:45|  19| 21|  47|   11|   2018|     47|ee341bec-001e-418...|
|1542809782|2018-11-22 01:16:22|   1| 22|  47|   11|   2018|     47|e3e62b54-9914-452...|
|1542166466|2018-11-14 14:34:26|  14| 14|  46|   11|   2018|     46|87411f2a-4647-484...|
|1542180760|2018-11-14 18:32:40|  18| 14|  46|   11|   2018|     46|fd805ea7-6099-4f2...|
|1542219957|2018-11-15 05:25:57|   5| 15|  46|   11|   2018|     46|6bca7446-70e7-471...|
|154222659

In [2]:
import boto3
import os

def download_dir(client, resource, dist, local='c:\\tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            dest_pathname = os.path.join(local, file.get('Key'))
            if not os.path.exists(os.path.dirname(dest_pathname)):
                os.makedirs(os.path.dirname(dest_pathname))
            resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)

def _start():
    client = boto3.client('s3')
    resource = boto3.resource('s3')
    download_dir(client, resource, 'song_data/', 'c:\\tmp', bucket='udacity-dend')

In [None]:
_start()

In [52]:
def read_song_data(spark, input_data):
    """
    Convenience wrapper function to get a read dataframe for the song data json files
    :param spark: spark session to execute queries against
    :param input_data: s3 bucket (formatted) location of the input data
    :return: spark dataframe with the loaded song data
    """
    song_data = f"{input_data}/song_data/*/*/*/*.json"

    schema = StructType([
        StructField("artist_id", StringType()),
        StructField("artist_latitude", FloatType()),
        StructField("artist_location", StringType()),
        StructField("artist_longitude", FloatType()),
        StructField("artist_name", StringType()),
        StructField("duration", FloatType()),
        StructField("num_songs", IntegerType()),
        StructField("song_id", StringType()),
        StructField("title", StringType()),
        StructField("year", IntegerType())
    ])
    return spark.read.json(song_data, schema=schema)

def process_log_data(spark, input_data, output_data):
    """
    The function that starts processing of the log data set
    :param spark: spark session to execute queries against
    :param input_data: s3 bucket (formatted) location of the input data
    :param output_data: s3 bucket (formatted) of the output data
    """
    # get filepath to log data file
    log_data = f"{input_data}/log_data/"

    # read log data file
    log_df = spark.read.json(log_data)

    # filter by actions for song plays
    next_song_log_df = log_df.filter(col('page') == 'NextSong')

    # extract columns for users table
    user_id_by_ts_window = Window.partitionBy(
        col('userId')) \
        .orderBy(col('ts'))
    user_id_by_ts_window_ranged = Window.partitionBy(
        col('userId')) \
        .orderBy(col('ts').desc()) \
        .rangeBetween(Window.unboundedPreceding, Window.currentRow)

    users_df = next_song_log_df \
        .withColumn('user_row_num', row_number().over(user_id_by_ts_window)) \
        .withColumn('firstName', last('firstName').over(user_id_by_ts_window_ranged)) \
        .withColumn('lastName', last('lastName').over(user_id_by_ts_window_ranged)) \
        .withColumn('gender', last('gender').over(user_id_by_ts_window_ranged)) \
        .withColumn('level', last('level').over(user_id_by_ts_window_ranged)) \
        .select('firstName', 'lastName', 'gender', 'level', 'userid', 'user_row_num') \
        .where(col('user_row_num') == 1)

    # write users table to parquet files
    # users_df \
    #     .write \
    #     .mode("overwrite") \
    #     .parquet(f"{output_data}/users_table")

    # create timestamp column from original timestamp column
    log_with_time_df = log_df\
        .withColumn('start_time',
                    (col('ts') / 1000).cast(IntegerType()))\
        .dropDuplicates()
    log_with_time_df = log_with_time_df \
        .withColumn('ts_as_datetime', from_unixtime(col('start_time'), 'yyyy-MM-dd HH:mm:ss').cast(TimestampType())) \
        .withColumn('hour', hour(col('ts_as_datetime'))) \
        .withColumn('day', dayofmonth(col('ts_as_datetime'))) \
        .withColumn('week', weekofyear(col('ts_as_datetime'))) \
        .withColumn('month', month(col('ts_as_datetime'))) \
        .withColumn('year', year(col('ts_as_datetime'))) \
        .withColumn('weekday', weekofyear(col('ts_as_datetime')))

    # write time table to parquet files partitioned by year and month
    # log_with_time_df \
    #     .write \
    #     .partitionBy('year', 'month') \
    #     .mode("overwrite") \
    #     .parquet(f"{output_data}/time_table")

    # read in song data to use for songplays table
    song_df = read_song_data(spark, input_data)
    song_df = song_df.drop('year')
    log_with_time_df = log_with_time_df.withColumn('userAgent', (col('userAgent').cast(StringType())))
    log_with_time_df.printSchema()
    song_df.printSchema()

    # extract columns from joined song and log datasets to create songplays table
    uuidUdf = udf(lambda: str(uuid.uuid4()), StringType())
    songplays_table = log_with_time_df \
        .join(song_df, song_df.title == log_df.song) \
        .withColumn('songplay_id', uuidUdf())\
        .select(
            col('songplay_id').alias('songplay_id'),
            col('start_time').alias('start_time'),
            col('userId').alias('user_id'),
            col('level').alias('level'),
            col('song_id').alias('song_id'),
            col('artist_id').alias('artist_id'),
            col('sessionId').alias('session_id'),
            col('location').alias('location'),
            col('userAgent').alias('user_agent'),
            col('year').alias('year'),
            col('month').alias('month'),
        )

    songplays_table.printSchema()

In [53]:
process_log_data(spark, '../data', '../data/output')


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: integer (nullable = true)
 |-- ts_as_datetime: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (null