In [1]:
import configparser
from datetime import datetime
import os
from os import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [3]:
# Inicializar session en Spark
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [21]:
# Obtener los archivos de S3 bucket
song_data = "s3a://udacity-dend/song_data/A/B/C/*.json"
log_data = "s3a://udacity-dend/log_data/2018/11/*.json"

df_song = spark.read.json(song_data)
df_long = spark.read.json(log_data)

In [22]:
df_song.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [23]:
df_long.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [24]:
#Tabla SONG
songs_table = df_song.select("song_id", "title", "artist_id", "year", "duration").distinct()
songs_table.show(1)

+------------------+-----+------------------+----+--------+
|           song_id|title|         artist_id|year|duration|
+------------------+-----+------------------+----+--------+
|SOQFYBD12AB0182188|Intro|ARAADXM1187FB3ECDB|1999|67.63057|
+------------------+-----+------------------+----+--------+
only showing top 1 row



In [25]:
#Tabla ARTIST
artists_table  = df_song.selectExpr("artist_id", "artist_name name", "artist_location location","artist_latitude latitude", "artist_longitude longitude")
artists_table.show(1)

+------------------+-----------+--------------+--------+---------+
|         artist_id|       name|      location|latitude|longitude|
+------------------+-----------+--------------+--------+---------+
|ARLTWXK1187FB5A3F8|King Curtis|Fort Worth, TX|32.74863|-97.32925|
+------------------+-----------+--------------+--------+---------+
only showing top 1 row



In [26]:
#LONG
df_long = df_long.filter(df_long.page == "NextSong")

In [27]:
#Tabla USER
users_table = df_long.selectExpr("userId user_id", "firstName first_name", "lastName last_name", "gender", "level").distinct()
users_table.show(1)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
+-------+----------+---------+------+-----+
only showing top 1 row



In [28]:
#Convertir ts a timeStamp
get_datetime = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
df_long = df_long.withColumn("start_time", get_datetime(df_long.ts))

In [30]:
#Seleccionar distintos ID
table_time = df_long.select("start_time").distinct()
table_time

DataFrame[start_time: timestamp]

In [31]:
#Table TIME
table_time = table_time.withColumn("hour", hour(col("start_time"))).withColumn("day", dayofmonth(col("start_time"))).withColumn("week", weekofyear(col("start_time"))).withColumn("month", month(col("start_time"))).withColumn("year", year(col("start_time"))).withColumn("weekday", dayofweek(col("start_time")))

In [33]:
table_time

DataFrame[start_time: timestamp, hour: int, day: int, week: int, month: int, year: int, weekday: int]

In [34]:
#Se realiza JOIN para obtener la tabla SONG_PLAY
cond = [df_long.artist == df_song.artist_name,df_long.song == df_song.title]
df_join = df_long.join(df_song, cond, "inner")

In [35]:
#Se agrega Identificador a la tabla SONG_PLAY
df_join = df_join.withColumn("songplay_id", monotonically_increasing_id())

In [36]:
#Tabla SONG_TABLE
song_play_table = df_join.selectExpr("songplay_id","start_time","userId user_id","level","song_id","artist_id","sessionId session_id","location","userAgent user_agent")
song_play_table

DataFrame[songplay_id: bigint, start_time: timestamp, user_id: string, level: string, song_id: string, artist_id: string, session_id: bigint, location: string, user_agent: string]

In [37]:
#Se guaran las tablas
path = "s3a://helaine-data-lake/parquet_files/"
parquet = "song_table.parquet"
songs_table.write.parquet(path+parquet,partitionBy = ("year", "artist_id"),mode = "overwrite")

In [42]:
path_user = path + "user_table.parquet"
path_time = path + "time_table.parquet"
path_songPlay = path + "songplay_table.parquet"

In [40]:
users_table.write.parquet(path=path_user, mode = "overwrite" )

In [None]:
table_time.write.parquet(path=path_time,mode = "overwrite")