In [8]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import StructType, FloatType, LongType, IntegerType, StringType, StructField


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

log_data = config['S3']['LOG_DATA']
song_data = config['S3']['SONG_DATA']
input_data = "s3a://udacity-dend/"
output_data = "s3a://alex-udacity-dlp-test/"

spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

In [9]:
schema = StructType([
        StructField("artist", StringType()),
        StructField("auth", StringType()),
        StructField("firstName", StringType()),
        StructField("gender", StringType()),
        StructField("itemInSession", IntegerType()),
        StructField("lastName", StringType()),
        StructField("length", FloatType()),
        StructField("level", StringType()),
        StructField("location", StringType()),
        StructField("method", StringType()),
        StructField("page", StringType()),
        StructField("registration", LongType()),
        StructField("sessionId", IntegerType()),
        StructField("song", StringType()),
        StructField("status", IntegerType()),
        StructField("ts", LongType()),
        StructField("userAgent", StringType()),
        StructField("userId", IntegerType())
    ])    

In [10]:
df = spark.read.json(f"{input_data}{log_data}", schema)

In [11]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,,,,,,,,,,,,,,,,,,
1,,,,,,,,,,,,,,,,,,
2,,,,,,,,,,,,,,,,,,
3,,,,,,,,,,,,,,,,,,
4,,,,,,,,,,,,,,,,,,


In [None]:
print((df.count(), len(df.columns)))

In [None]:
df = df.filter(df.page == "NextSong")

In [None]:
users_table = df.select(col('userId').alias('user_id'),
                            col('firstName').alias('first_name'),
                            col('lastName').alias('last_name'),
                            'gender',
                            'level').dropDuplicates(['user_id'])

In [None]:
users_table.limit(3).toPandas()

In [None]:
users_table = users_table.write.partitionBy('gender', 'level').parquet(f"{output_data}users_table.parquet", mode="overwrite")

In [None]:
df.printSchema()

In [None]:
df = df.filter(df.page == "NextSong")

In [None]:
from pyspark.sql.types import TimestampType, DateType

get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
df = df.withColumn('timestamp', get_timestamp(df.ts))

get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000), DateType())
df = df.withColumn('datetime', get_datetime(df.ts))

In [None]:
time_table = df.select(col('timestamp').alias('start_time'),
                           hour('datetime').alias('hour'),
                           dayofmonth('datetime').alias('day'),
                           weekofyear('datetime').alias('week'),
                           month('datetime').alias('month'),
                           year('datetime').alias('year'),
                           date_format('datetime','E').alias('weekday'))

In [None]:
time_table.limit(5).toPandas()

In [None]:
time_table = time_table.write.parquet(f"{output_data}time_table.parquet", mode="overwrite")

In [None]:
f"{input_data}{song_data}"

In [None]:
# read song data file
df = spark.read.json(f"{input_data}{song_data}")

In [None]:
print((df.count(), len(df.columns)))

In [None]:
# extract columns to create songs table
songs_table = df.select(['song_id', 'title', 'artist_id', 'year', 'duration']).dropDuplicates(['song_id'])
    
# write songs table to parquet files partitioned by year and artist
songs_table = songs_table.write.partitionBy('artist_id', 'year').parquet(f"{output_data}songs_table.parquet", mode="overwrite")

# extract columns to create artists table
artists_table = df.select('artist_id',
                          col('artist_name').alias('name'),
                          col('artist_location').alias('location'),
                          col('artist_latitude').alias('latitude'),
                          col('artist_longitude').alias('longitude')).dropDuplicates(['artist_id'])
    
# write artists table to parquet files
#artists_table = artists_table.write.partitionBy('location').parquet(f"{output_data}artists_table.parquet", mode="overwrite")