In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import  pyspark.sql.functions as F
import time

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

In [6]:
spark

In [7]:
import boto3
bucket = 'udacity-dend'
#Make sure you provide / in the end
prefix = 'song_data/A/'  
file_locations = []

client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix=prefix, Delimiter='/')
for o in result.get('CommonPrefixes'):
    file_locations.append(o.get('Prefix'))

print("# of file_locations:", len( file_locations))
file_locations = ["s3a://udacity-dend/" + s for s in file_locations]
file_locations = [ s + "*" for s in file_locations]

# of file_locations: 26


In [9]:
s3path = 'https://s3.console.aws.amazon.com/s3/buckets/udacity-dend/?region=us-west-2#'
s3pathsong = 'https://s3.console.aws.amazon.com/s3/buckets/udacity-dend/song_data/A/?region=us-west-2'


In [10]:
import boto3

start = time.time()
s3 = boto3.resource('s3')
udacity_bucket = s3.Bucket('udacity-dend')
prefix = 'song_data/A/'
songs_paths = []

for object_summary in udacity_bucket.objects.filter(Prefix=prefix):
    songs_paths.append(object_summary.key)

songs_paths = ["s3a://udacity-dend/" + s for s in songs_paths]
end = time.time()

print('runtime (s):', end-start)
print('Total number of `song_data` files:', len(songs_paths))

runtime (s): 10.252723693847656
Total number of `song_data` files: 14896


In [11]:
# Not sure that second line is faster than first
#df = spark.read.format("json").load("s3a://udacity-dend/song_data/*/*/*") # runs in 13 minutes
#df = spark.read.format("json").load(file_locations)
# df = spark.read.format("json").load(songs_paths)
# song_data = input_data+'song_data/*/*/*/*.json
# song_data = input_data + 'song_data/*/*/*/*.json # runs in 14 minutes
# df = spark.read.json(song_data) # runs in 14 minutes
# df = spark.read.json(songs_paths) # runs in 26 minutes

start = time.time()
input_data = "s3a://udacity-dend/"
song_data =  input_data + 'song_data/*/*/*/*.json'
df = spark.read.format("json").load("s3a://udacity-dend/song_data/*/*/*") # runs in 13 minutes
end = time.time()
print('runtime (s):', end-start)

runtime (s): 832.633446931839


In [12]:
print(type(df))
print(df)
print('count', df.count())
#df.printSchema()

<class 'pyspark.sql.dataframe.DataFrame'>
DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]
count 14896


In [11]:
#df.show(5)

In [None]:
#df = spark.read.json("s3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json")

In [None]:
#df = spark.read.json("s3a://udacity-dend/song_data/")

In [None]:
#song_data = "s3a://udacity-dend/song_data/"
#df = spark.read.load(song_data)

In [12]:
#df_songs = df.withColumn("songplay_id", col("song_id"))
#df_songs.show(5)

In [16]:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration", col("artist_name").alias("artist")).distinct() # interesting...when I use .show(), i can't seem to write to a variable, almost as if its
# print statement

In [17]:
#songs_table.show(5) # TOO SLOW

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOLLALT12A8C1399F3|Piano Concerto No...|ARWMEJW11F4C83C123|   0|319.37261|
|SOAGZUH12A6D4FB4C5|The Sparrows And ...|AR4YEJU1187B991468|1991|191.26812|
|SOSJVZR12AB018698F|Four Simple Steps...|ARJ8IIY1187B992734|2003|  58.8273|
|SORAUDC12A6D4F7273|The Debt Is Settl...|ARJF4E41187FB499FF|1978|233.11628|
|SONXXYP12A81C238EE|Don't Waste My Ti...|ARZNYC61187B9958CF|1985|324.85832|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [18]:
#df.selectExpr("pos.metadata['song_id'] as songplay_id").limit(5).show(5)

In [19]:
'''
df.createOrReplaceTempView("songtable")
#spark.sql(""" CREATE TABLE IF NOT EXISTS songs (song_id STRING, title STRING, artist_id STRING, year INT, duration float)""")
spark.sql("""
    
    SELECT song_id, title, artist_id, year, duration 
    FROM songtable
""").show()
'''

'\ndf.createOrReplaceTempView("songtable")\n#spark.sql(""" CREATE TABLE IF NOT EXISTS songs (song_id STRING, title STRING, artist_id STRING, year INT, duration float)""")\nspark.sql("""\n    \n    SELECT song_id, title, artist_id, year, duration \n    FROM songtable\n""").show()\n'

In [13]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://dend-sparkify-amiri/"
#output_data = "local_folder_name"
#output_data = r"C:\Users\amiri\Documents\GitHub\Data_Engineering_Data_Lake_with_Spark"

In [15]:
# extract columns to create songs table
start = time.time()
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").distinct()
end = time.time()
print('extract columns to create songs table runtime (s):', end-start)

# write songs table to parquet files partitioned by year and artist
start = time.time()
songs_table_parquet = df.select("song_id", "title", "artist_id", "year", "duration", col("artist_name").alias("artist")).distinct()
songs_table_parquet.write.partitionBy("year", "artist").parquet(output_data + "songs/")
end = time.time()
print('write songs table to parquet files runtime (s):', end-start)

extract columns to create songs table runtime (s): 0.031922340393066406


Py4JJavaError: An error occurred while calling o92.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 6.0 failed 1 times, most recent failure: Lost task 2.0 in stage 6.0 (TID 12542, DESKTOP-GJKKHSO, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
	at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
	at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
	at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
	... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
	at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)
	at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)
	at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.newOutputWriter(FileFormatDataWriter.scala:241)
	at org.apache.spark.sql.execution.datasources.DynamicPartitionDataWriter.write(FileFormatDataWriter.scala:262)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:273)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
	... 9 more


In [None]:
# extract columns to create artists table 
start = time.time()
artists_table = df.select("artist_id", "artist_name", "location", "latitidue", "longitude").distinct()
end = time.time()
print('extract columns to create artists table runtime (s):', end-start)

# write artists table to parquet files
start = time.time()
artists_table.write.parquet("artists_table_parquet.parquet")
end = time.time()
print('write artists table to parquet files runtime (s):', end-start)

## Put finalized code from above in sections below, then transfer to .py file. 

In [None]:
def process_song_data(spark, input_data, output_data):
    '''
    This function reads the data from S3, processes the song data using Spark, and writes the processed data back to S3.
    Parameters:
    spark: The spark session
    input_data: The S3 path location up to, but not including `song_data`
    output_data: The S3 bucket where the new dimensional tables will be written to
    '''
    # get filepath to song data file
    start = time.time()
    song_data =  input_data + 'song_data/*/*/*/*.json'
    end = time.time()
    print('runtime (s):', end-start)
    
    # read song data file
    start = time.time()
    df = spark.read.json(song_data) # may not need this new schema
    end = time.time()
    print('read song data file runtime (s):', end-start)

    # extract columns to create songs table
    start = time.time()
    songs_table = df.select("song_id", "title", "artist_id", "year", "duration").distinct()
    end = time.time()
    print('extract columns to create songs table runtime (s):', end-start)

    # write songs table to parquet files partitioned by year and artist
    start = time.time()
    songs_table_parquet = df.select("song_id", "title", "artist_id", "year", "duration", col("artist_name").alias("artist")).distinct()
    songs_table_parquet = songs_table_parquet.write.partitionBy("year", "artist").parquet("song_table_parquet.parquet")
    end = time.time()
    print('write songs table to parquet files runtime (s):', end-start)

    # extract columns to create artists table
    start = time.time()
    artists_table = df.select("artist_id", "artist_name", "location", "latitidue", "longitude").distinct()
    end = time.time()
    print('extract columns to create artists table runtime (s):', end-start)

    # write artists table to parquet files
    start = time.time()
    artists_table_parquet = artists_table.write.parquet("artists_table_parquet.parquet")
    end = time.time()
    print('write artists table to parquet files runtime (s):', end-start)

In [None]:
def process_log_data(spark, input_data, output_data):
   '''
    This function reads the data from S3, processes the log data using Spark, and writes the processed data back to S3.
   '''
    # get filepath to log data file
    log_data = 's3a://udacity-dend/log_data/'

    # read log data file
    df = 
    
    # filter by actions for song plays
    df = 

    # extract columns for users table    
    artists_table = 
    
    # write users table to parquet files
    artists_table

    # create timestamp column from original timestamp column
    get_timestamp = udf()
    df = 
    
    # create datetime column from original timestamp column
    get_datetime = udf()
    df = 
    
    # extract columns to create time table
    time_table = 
    
    # write time table to parquet files partitioned by year and month
    time_table

    # read in song data to use for songplays table
    song_df = 

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = 

    # write songplays table to parquet files partitioned by year and month
    songplays_table

In [None]:
def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://dend-sparkify-amiri/output_data"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)

In [None]:
if __name__ == "__main__":
    main()

In [None]:
# SANDBOX AREA BELOW ############

In [None]:
'''
# This code is from the internet, and it works
from pyspark import SparkContext
sc = SparkContext(master = 'local')
create_spark_session()
'''

In [None]:
# Works! Stops the spark context
SparkSession.stop()

In [None]:
# !pip install boto3
# Also had to install Java SDK

In [None]:
# Link to udacity bucket with the console:  https://s3.console.aws.amazon.com/s3/buckets/udacity-dend/?region=us-west-2