In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

# more imports for converting ts to timestamp
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_timestamp, monotonically_increasing_id

# more imports for counting records
import pandas as pd
import numpy as np

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

In [None]:
os.environ['AWS_ACCESS_KEY_ID'] = config.get('USER', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('USER', 'AWS_SECRET_ACCESS_KEY')

#print(os.environ['AWS_ACCESS_KEY_ID'])
#print(os.environ['AWS_SECRET_ACCESS_KEY'])

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [None]:
spark = create_spark_session()

#spark.sparkContext.master

#spark.udf.register('noop', lambda x: x)

# specify data locations - 'Udacity's json input' and 'Student's parquet output'
input_data = "s3a://udacity-dend/"
output_data = "s3a://project4-spark/"

In [None]:
# Step 1: get filepath to song data file
# only a subset of the data considered
song_data = input_data + "song-data/A/A/A/*.json"

#print(song_data)

In [None]:
# Step 2: read song data file
df = spark.read.json(song_data)

#print(df)
#df.show(5)

In [None]:
# Step 3: extract columns to create songs table
songs_table = df.select('song_id', 'artist_id', 'year', 'duration')

#print(songs_table)
#songs_table.show(5)

In [None]:
# Step 4: write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data + "songs")

In [None]:
# Step 5: extract columns to create artists table
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude')

#print(artists_table)
#artists_table.show(5)

In [None]:
# Step 6: write artists table to parquet files
artists_table.write.parquet(output_data + "artists")

In [None]:
# Step 1: get filepath to log data file
log_data = input_data + 'log-data/*/*/*.json'

#print(log_data)

In [None]:
# Step 2: read log data file
df = spark.read.json(log_data)

print(df)
df.show(5)
#print(df.shape[0]) -> AttributeError: 'DataFrame' object has no attribute 'shape'
#print(len(df)) -> TypeError: object of type 'DataFrame' has no len()
df.count()

In [None]:
# Step 3: filter by actions for song plays
df = df.filter(col("page") == 'NextSong')

print(df)
df.show(5)
df.count()

In [None]:
# Step 4: extract columns for users table    
# changed table name from artists_table to users_table to reflect the purpose
users_table = df.select(col("userId").alias("user_id"),col("firstName").alias("first_name"), col("lastName").alias("last_name"),"gender","level")

print(users_table)
users_table.show(5)

In [None]:
# Step 5: write users table to parquet files
users_table.write.parquet(output_data + "users")

#check this!
#users_table.to_pandas()

In [None]:
# create timestamp column from original timestamp column
#get_timestamp = udf()
#df = 
    
# create datetime column from original timestamp column
#get_datetime = udf()
#df = 

# Step 6: define ts format
tsFormat = "yyyy-MM-dd HH:MM:ss z"
# Step 7: convert ts to a timestamp format    
time_table = df.withColumn('ts', to_timestamp(date_format((df.ts/1000).cast(dataType=TimestampType()), tsFormat), tsFormat))

print(time_table)
time_table.show(5)

In [None]:
# Step 8: extract columns to create time table    
time_table = time_table.select(col("ts").alias("start_time"),
                                hour(col("ts")).alias("hour"),
                                dayofmonth(col("ts")).alias("day"), 
                                weekofyear(col("ts")).alias("week"), 
                                month(col("ts")).alias("month"),
                                year(col("ts")).alias("year"))

print(time_table)
time_table.show(5)

In [None]:
# Step 9: write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").parquet(output_data + "time")

In [None]:
# Step 10: read in song data to use for songplays table
song_df = spark.read.json(song_data)

print(song_df)
song_df.show(5)

In [None]:
# Step 11: extract columns from joined song and log datasets to create songplays table 
songplays_table = song_df.join(df, song_df.artist_name==df.artist).withColumn("songplay_id", monotonically_increasing_id()).withColumn('start_time', to_timestamp(date_format((col("ts") /1000).cast(dataType=TimestampType()), tsFormat),tsFormat)).                             select("songplay_id",
           "start_time",                         
           col("userId").alias("user_id"),
           "level",
           "song_id",
           "artist_id",
           col("sessionId").alias("session_id"),
           col("artist_location").alias("location"),
           "userAgent",
           month(col("start_time")).alias("month"),
           year(col("start_time")).alias("year"))

print(songplays_table)
songplays_table.show(5)

In [None]:
# Step 12: write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year", "month").parquet(output_data + "songplays")

In [None]:
parquetFile = spark.read.parquet("s3a://project4-spark/songplays/*/*/*.parquet")

parquetFile.show(5)