In [130]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, to_timestamp, dayofweek
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, DateType, FloatType, TimestampType

In [25]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

In [26]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [27]:
song_data = "./data/song_data/*/*/*/*.json"

In [28]:
song_data_schema = StructType([
    StructField("num_songs",         IntegerType()),
    StructField("artist_id",         StringType()),
    StructField("artist_latitude",   FloatType()),
    StructField("artist_longitude",  FloatType()),
    StructField("artist_location",   StringType()),
    StructField("artist_name",       StringType()),
    StructField("song_id",           StringType()),
    StructField("title",             StringType()),
    StructField("duration",          FloatType()),
    StructField("year",              IntegerType()),
    ])

In [29]:
df = spark.read.json(song_data, schema=song_data_schema)

In [30]:
df.show(5)

+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|  artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|           null|            null|                 |Montserrat Caball...|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16364|   0|
|        1|AREBBGV1187FB523D2|           null|            null|      Houston, TX|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|
|        1|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|
|        1|ARPBNLO1187FB3D52F|       40.71455|      

In [48]:
df.count()

71

In [54]:
songs_table = df.select("song_id", "title", "artist_id", "year", col("duration"))

In [55]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16364|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48772|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [49]:
songs_table.count()

71

In [59]:
artists_table = df.select("artist_id",
                              col("artist_name").alias("name"),
                              col("artist_location").alias("location"),
                              col("artist_latitude").alias("latitude"),
                              col("artist_longitude").alias("longitude")).distinct()

In [60]:

log_data = "./data/log_data"

In [90]:
    df = spark.read.json(log_data)


In [91]:
    df = df.filter("page == 'NextSong'")


In [92]:
df.count()

6820

In [93]:
    users_table = df.select(col("userId").alias("user_id"),
                            col("firstName").alias("first_name"),
                            col("lastName").alias("last_name"),
                            col("gender"),
                            col("level")).distinct()

In [94]:
users_table.show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
|     23|    Morris|  Gilmore|     M| free|
|     75|    Joseph|Gutierrez|     M| free|
|     16|     Rylan|   George|     M| paid|
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|     54|     Kaleb|     Cook|     M| free|
|     79|     James|   Martin|     M| free|
|     80|     Tegan|   Levine|     F| paid|
|     77| Magdalene|   Herman|     F| free|
|     47|    Kimber|   Norris|     F| free|
|     30|     Avery|  Watkins|     F| paid|
|     22|      Sean|   Wilson|     F| free|
|      4|    Alivia|  Terrell|     F| free|
|     55|    Martin|  Johnson|     M| free|
|     20|     Aiden|  Ramirez|  

In [95]:
df.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [120]:
get_timestamp = udf(lambda x: from_unixtime(x/1000),  StringType())

In [121]:
df = df.na.drop(subset=["ts"])

In [124]:
df = df.withColumn("timestamp", from_unixtime(col("ts")/1000))

In [125]:
df.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 01:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|S

In [129]:
df.select(year(col("timestamp"))).show()

+---------------+
|year(timestamp)|
+---------------+
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
|           2018|
+---------------+
only showing top 20 rows



In [133]:
time_table = df.select(col("timestamp").alias("start_time"),
                           hour(col("timestamp")).alias("hour"),
                           dayofmonth(col("timestamp")).alias("day"),
                           weekofyear(col("timestamp")).alias("week"),
                           month(col("timestamp")).alias("month"),
                           year(col("timestamp")).alias("year"),
                           dayofweek(col("timestamp")).alias("weekday")
                           ).distinct()

In [135]:
time_table.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 12:37:52|  12| 15|  46|   11|2018|      5|
|2018-11-15 13:44:06|  13| 15|  46|   11|2018|      5|
|2018-11-15 17:01:10|  17| 15|  46|   11|2018|      5|
|2018-11-15 17:42:27|  17| 15|  46|   11|2018|      5|
|2018-11-21 09:11:10|   9| 21|  47|   11|2018|      4|
|2018-11-21 23:09:04|  23| 21|  47|   11|2018|      4|
|2018-11-14 13:33:18|  13| 14|  46|   11|2018|      4|
|2018-11-28 12:41:40|  12| 28|  48|   11|2018|      4|
|2018-11-05 14:54:56|  14|  5|  45|   11|2018|      2|
|2018-11-30 06:15:12|   6| 30|  48|   11|2018|      6|
|2018-11-30 07:48:28|   7| 30|  48|   11|2018|      6|
|2018-11-30 11:35:18|  11| 30|  48|   11|2018|      6|
|2018-11-16 19:30:49|  19| 16|  46|   11|2018|      6|
|2018-11-16 19:34:09|  19| 16|  46|   11|2018|      6|
|2018-11-20 09:11:14|   9| 20|  47|   11|2018|      3|
|2018-11-2