In [104]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, DataType
import pyspark.sql.functions as f
# from pyspark.sql.functions import udf, col
# from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [108]:
config = configparser.ConfigParser()
config.read('/Users/brad/.aws/credentials')

os.environ['AWS_ACCESS_KEY_ID']=config['default']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['default']['AWS_SECRET_ACCESS_KEY']

# AWS_ACCESS_KEY_ID = config.get('default','AWS_ACCESS_KEY_ID')
# AWS_SECRET_ACCESS_KEY = config.get('default','AWS_SECRET_ACCESS_KEY')

In [109]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [110]:
# sc=spark.sparkContext
# hadoop_conf=sc._jsc.hadoopConfiguration()

# hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# hadoop_conf.set("fs.s3.awsAccessKeyId", AWS_ACCESS_KEY_ID)
# hadoop_conf.set("fs.s3.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)

In [111]:
input_data = "s3://brad-data-01/demo-data/udacity-data/"
output_data = "s3://brad-data-01/demo-data/udacity-data/output_data/"

In [112]:
# get filepath to song data file
song_data = input_data + "song_data/*/*/*/*"

df = spark.read.json(song_data).drop_duplicates()
df.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARPFHN61187FB575F6|       41.88415|         Chicago, IL|       -87.63241|         Lupe Fiasco|279.97995|        1|SOWQTQZ12A58A7B63E|Streets On Fire (...|   0|
|AR1Y2PT1187FB5B9CE|       27.94017|             Brandon|       -82.32547|         John Wesley|484.62322|        1|SOLLHMX12AB01846DC|   The Emperor Falls|   0|
|AR7G5I41187FB4CE6C|           null|     London, England|            null|            Adam Ant|233.40363|        1|SONHOTT12A8C13493C|     Something Girls|1982|
|AR10USD1187B99F3F1|           nul

In [31]:
# extract columns to create songs table
songs_table = df.select("song_id","title","artist_id","year","duration").drop_duplicates()
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTTDKS12AB018D69B|It Wont Be Christmas|ARMBR4Y1187B9990EB|   0|241.47546|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOIAZJW12AB01853F1|          Pink World|AR8ZCNI1187B9A069B|1984|269.81832|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [32]:
songs_table.write.parquet(output_data + "songs/", mode="overwrite", partitionBy=["year","artist_id"])

In [33]:
artists_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude").drop_duplicates()
artists_table.show(5)

+------------------+---------------+---------------+---------------+----------------+
|         artist_id|    artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+---------------+---------------+---------------+----------------+
|AR3JMC51187B9AE49D|Backstreet Boys|    Orlando, FL|       28.53823|       -81.37739|
|AR0IAWL1187B9A96D0|   Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARWB3G61187FB49404|    Steve Morse| Hamilton, Ohio|           null|            null|
|AR47JEX1187B995D81|   SUE THOMPSON|     Nevada, MO|       37.83721|       -94.35868|
|ARHHO3O1187B989413|      Bob Azzam|               |           null|            null|
+------------------+---------------+---------------+---------------+----------------+
only showing top 5 rows



In [34]:
artists_table.write.parquet(output_data + "artists/", mode="overwrite")

## Log_Data Function

In [47]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*"

dfLog = spark.read.json(log_data).drop_duplicates()
dfLog.show(5)

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|      Fat Joe|Logged In|     Kate|     F|           21| Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|
|  Linkin Park|Logged In|     Kate|     F|           33| Harrell|259.86567| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796

In [49]:
# filter by actions for song plays
dfLog = dfLog.filter(dfLog.page == "NextSong")
dfLog.show(5)

+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|           Fat Joe|Logged In|     Kate|     F|           21|  Harrell|241.34485| paid|Lansing-East Lans...|   PUT|NextSong|1.540472624796E12|      605|Safe 2 Say [The I...|   200|1542296032796|"Mozilla/5.0 (X11...|    97|
|       Linkin Park|Logged In|     Kate|     F|           33|  Harrell|259.86567| paid|Lansing-East Lans...|

In [51]:
# extract columns for users table
users_table = dfLog.select("userId","firstName","lastName","gender","level").drop_duplicates()
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [52]:
# write users table to parquet files
users_table.write.parquet(os.path.join(output_data, "users/") , mode="overwrite")

In [62]:
# create timestamp column from original timestamp column
get_timestamp = f.udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), TimestampType())
dfLog = dfLog.withColumn("start_time", get_timestamp("ts"))
dfLog.printSchema()
dfLog.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)

+------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+
|            a

In [75]:
# create datetime column from original timestamp column
get_datetime = f.udf(lambda x : datetime.utcfromtimestamp(int(x)/1000), DataType())
dfLog = dfLog.withColumn("datetime", get_datetime("ts"))
dfLog.printSchema()
dfLog.show(5)

IllegalArgumentException: Failed to convert the JSON string 'data' to a data type.

In [70]:
# extract columns to create time table
time_table = dfLog.withColumn("hour",f.hour("start_time"))\
                .withColumn("day",f.dayofmonth("start_time"))\
                .withColumn("week",f.weekofyear("start_time"))\
                .withColumn("month",f.month("start_time"))\
                .withColumn("year",f.year("start_time"))\
                .withColumn("weekday",f.dayofweek("start_time"))\
                .select("start_time","hour", "day", "week", "month", "year", "weekday").drop_duplicates()
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-30 04:24:...|   4| 30|  48|   11|2018|      6|
|2018-11-30 13:59:...|  13| 30|  48|   11|2018|      6|
|2018-11-12 23:20:...|  23| 12|  46|   11|2018|      2|
|2018-11-28 14:21:...|  14| 28|  48|   11|2018|      4|
|2018-11-28 23:38:...|  23| 28|  48|   11|2018|      4|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [76]:
# write time table to parquet files partitioned by year and month
time_table.write.parquet(os.path.join(output_data, "time_table/"), mode='overwrite', partitionBy=["year","month"])
# time_table.write.parquet(output_data + "time_table/", mode='overwrite', partitionBy=["year","month"])

In [77]:
# read in song data to use for songplays table
song_df = spark.read\
            .format("parquet")\
            .option("basePath", os.path.join(output_data, "songs/"))\
            .load(os.path.join(output_data, "songs/*/*/"))
song_df.show(5)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...| 43.36281|2000|ARPBNLO1187FB3D52F|
|SONYPOM12A8C13B2D7|I Think My Wife I...|186.48771|2005|ARDNS031187B9924F0|
|SODREIN12A58A7F2E5|A Whiter Shade Of...|326.00771|   0|ARLTWXK1187FB5A3F8|
|SOYMRWW12A6D4FAB14|The Moon And I (O...| 267.7024|   0|ARKFYS91187B98E58F|
|SOWQTQZ12A58A7B63E|Streets On Fire (...|279.97995|   0|ARPFHN61187FB575F6|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



In [82]:
# extract columns from joined song and log datasets to create songplays table
songplays_table = dfLog.join(song_df, dfLog.song == song_df.title, how='inner')\
                        .select(f.monotonically_increasing_id().alias("songplay_id"),f.col("start_time"),f.col("userId").alias("user_id"),"level","song_id","artist_id", f.col("sessionId").alias("session_id"), "location", f.col("userAgent").alias("user_agent"))
songplays_table.show(5)


+-------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|  songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
| 188978561024|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
| 584115552256|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|
| 944892805120|2018-11-27 22:35:...|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|
|1056561954816|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       484|Washington-Arling...|"Mozilla/5.0 (Mac...|
+-----

In [87]:
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner")\
                        .select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent", songplays_table.year, songplays_table.month)
songplays_table.show(5)

+-------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|  songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-------------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
| 188978561024|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
| 584115552256|2018-11-19 09:14:...|     24| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       672|Lake Havasu City-...|"Mozilla/5.0 (Win...|2018|   11|
| 944892805120|2018-11-27 22:35:...|     80| paid|SOGDBUF12A8C140FAA|AR558FS1187FB45658|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|
|1056561954816|2018-11-14 05:06:...|     10| free|SOGDBUF12A8C140FAA|AR558FS1187FB

In [88]:
 # write songplays table to parquet files partitioned by year and month
songplays_table.drop_duplicates().write.parquet(os.path.join(output_data, "songplays/"), mode="overwrite", partitionBy=["year","month"])


In [16]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data + "song_data/*/*/*/*"
    song_data
    # read song data file
    df = spark.read.json(song_data)
    df.show(5)

    # extract columns to create songs table
    # songs_table = 
    
    # write songs table to parquet files partitioned by year and artist
    # songs_table

    # extract columns to create artists table
    # artists_table = 
    
    # write artists table to parquet files
    # artists_table



In [17]:
process_song_data(spark, input_data, output_data)

Py4JJavaError: An error occurred while calling o34.json.
: java.io.IOException: No FileSystem for scheme: s3
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:477)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
