In [144]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

In [145]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
spark

In [146]:
input_data = "/home/workspace/data/more/"
output_data = "/home/workspace/data/output/"

In [147]:
song_data = input_data+"/song_data/*/*/*/*.json"

In [148]:
df = spark.read.json(song_data)

In [149]:
df.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [150]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [151]:
#song_id, title, artist_id, year, duration
songs_table = df.select("song_id", "title", "artist_id", "year","duration").dropDuplicates()
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [152]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [153]:
#songs_table.write.partitionBy("year", "artist_id").parquet(output_data + 'songs/')
songs_table.write.parquet(output_data + "songs/", mode="overwrite", partitionBy=["year","artist_id"])

In [154]:
#artist_id, name, location, lattitude, longitude
artists_table = df.selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as lattitude", "artist_longitude as longitude").dropDuplicates()
artists_table.show(5)

+------------------+---------------+--------------------+---------+---------+
|         artist_id|           name|            location|lattitude|longitude|
+------------------+---------------+--------------------+---------+---------+
|ARPFHN61187FB575F6|    Lupe Fiasco|         Chicago, IL| 41.88415|-87.63241|
|AR3JMC51187B9AE49D|Backstreet Boys|         Orlando, FL| 28.53823|-81.37739|
|ARXR32B1187FB57099|            Gob|                    |     null|     null|
|AROUOZZ1187B9ABE51|    Willie Bobo|New York, NY [Spa...| 40.79195|-73.94512|
|AROGWRA122988FEE45|Christos Dantis|                    |     null|     null|
+------------------+---------------+--------------------+---------+---------+
only showing top 5 rows



In [155]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [156]:
artists_table.write.parquet(output_data + 'artists/')

In [157]:
log_data = input_data + "*.json"

In [159]:
df = spark.read.json(log_data)

In [160]:
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [162]:
df.count()

8056

In [161]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [163]:
df = df.filter(df.page == "NextSong")

In [164]:
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [165]:
#user_id, first_name, last_name, gender, level
users_table = df.selectExpr("userID as user_id", "firstName as first_name", "lastName as last_name", "gender", "level").dropDuplicates()

In [166]:
users_table.write.parquet(output_data + "users/")

In [167]:
get_timestamp = udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
df = df.withColumn("start_time", get_timestamp("ts"))

In [168]:
df.select("ts", "start_time").show(5)

+-------------+--------------------+
|           ts|          start_time|
+-------------+--------------------+
|1542241826796|2018-11-15 00:30:...|
|1542242481796|2018-11-15 00:41:...|
|1542242741796|2018-11-15 00:45:...|
|1542253449796|2018-11-15 03:44:...|
|1542260935796|2018-11-15 05:48:...|
+-------------+--------------------+
only showing top 5 rows



In [169]:
#start_time, hour, day, week, month, year, weekday
time_table = df.select("start_time").dropDuplicates() \
.withColumn("hour", hour("start_time")).withColumn("day", dayofmonth("start_time")) \
.withColumn("week", weekofyear("start_time")).withColumn("month", month("start_time")) \
.withColumn("year", year("start_time")).withColumn("weekday", date_format("start_time", 'E'))

In [170]:
time_table.write.parquet(output_data + "time/")

In [188]:
#songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#songs_df = spark.read.parquet(output_data + 'songs/*/*')
songs_df = spark.read\
                .format("parquet")\
                .option("basePath", os.path.join(output_data, "songs/"))\
                .load(os.path.join(output_data, "songs/*/*/"))

In [189]:
songsplays_table = df.join(songs_df, df.song == songs_df.title, how='inner')\
.select(monotonically_increasing_id().alias("songplay_id"),"start_time",col("userId").alias("user_id"), \
        "level","song_id", "artist_id", col("sessionId").alias("session_id"), "location", col("userAgent").alias("user_agent")).show(5)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          0|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
|          1|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|
|          2|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|
|          3|2018-11-27 22:35:...|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|
+-----------+-------