In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['DEFAULT']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['DEFAULT']['AWS_SECRET_ACCESS_KEY']

# Drafts

In [2]:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()

In [None]:
#input_data = "s3a://udacity-dend"
input_data = "data"

output_data = "s3a://pazeto-bucket-nd"

In [3]:


# get filepath to song data file
song_data = "{}song_data/*.json".format(input_data)
print(song_data)
# read song data file
df = spark.read.json(song_data) 

data/song_data/*.json


In [4]:
print(df.printSchema())

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

None


In [None]:
# get filepath to song data file
log_data = "{}log_data".format(input_data)
print(log_data)
# read song data file
df = spark.read.json(log_data) 

# Real methods

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [6]:
def process_song_data(spark, input_data, output_data):
    
    # get filepath to song data file
    song_data = "{}/song_data".format(input_data)

    # read song data file
    df = spark.read.json(song_data) 
    
    # extract columns to create songs table
    songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs.parquet'), 'overwrite')

    # extract columns to create artists table
    artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'] 
    
    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data, 'artists.parquet'), 'overwrite')

In [18]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
#     log_data = "{}/log_data/*/*/*/*.json".format(input_data)
    log_data = "{}/log_data".format(input_data)
    
    # read log data file
    df = spark.read.json(log_data) 

    # filter by actions for song plays
    songplays_table = df['ts', 'userId', 'level','sessionId', 'location', 'userAgent']

    # extract columns for users table 
    users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x)/1000)))
    df = df.withColumn('timestamp', get_timestamp(df.ts))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
    df = df.withColumn("datetime", get_datetime(df.ts))
    
    # extract columns to create time table
    time_table = df.select(
        col('datetime').alias('start_time'),
        hour('datetime').alias('hour'),
        dayofmonth('datetime').alias('day'),
        weekofyear('datetime').alias('week'),
        month('datetime').alias('month'),
        year('datetime').alias('year') 
    )
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time.parquet'), 'overwrite')

    # read in song data to use for songplays table
    song_df = spark.read.parquet(os.path.join(output_data, 'songs.parquet')) 

    # extract columns from joined song and log datasets to create songplays table 
    df = df.join(song_df, song_df.title == df.song)
    songplays_table = df['ts', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']
    songplays_table.select(monotonically_increasing_id().alias('songplay_id')).collect()
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')

In [10]:
spark = create_spark_session()

input_data = "data"

output_data = "s3a://pazeto-bucket-nd"

In [11]:
process_song_data(spark, input_data, output_data)    

In [19]:
process_log_data(spark, input_data, output_data)

+--------------------+----+---+----+-----+----+
|          start_time|hour|day|week|month|year|
+--------------------+----+---+----+-----+----+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|
|2018-11-15 01:57:...|   1| 15|  46|   11|2018|
|2018-11-15 03:29:...|   3| 15|  46|   11|2018|
+--------------------+----+---+----+-----+----+
only showing top 5 rows

None


NameError: name 'monotonically_increasing_id' is not defined