In [None]:
from pyspark import SparkContext
import json
import time

# Initialize a Spark context configured to use four threads and named 'top20Song'
sc = SparkContext('local[4]', 'top20Song')
sc.setLogLevel('FATAL')  # Suppress logging except for critical messages

start = time.perf_counter()  # Start timing the operation

# Specify the location of the input data in an S3 bucket
input_bucket = "s3://a3ayush/"

# Load the data from the S3 bucket into an RDD (Resilient Distributed Dataset)
lines = sc.textFile(input_bucket)

# Transform the data:
# 1. Parse each line as JSON.
# 2. Extract the 'song' field.
# 3. Count each song occurrence.
# 4. Filter out entries without a song name.
# 5. Aggregate counts by song.
# 6. Sort by count in descending order.
# 7. Retrieve the top 20 entries.
counter = lines.map(lambda line: json.loads(line)) \
              .map(lambda event: (event.get("song"), 1)) \
              .filter(lambda x: x[0] is not None) \
              .reduceByKey(lambda a, b: a + b) \
              .sortBy(lambda a: -a[1]) \
              .take(20)

# Output the time taken to perform the data processing
print('Time taken for entire job = {:.4f}s'.format(time.perf_counter() - start))
print('\nThe top 20 songs are:\nSong\t\tCount\n')

# Save the results to a text file and print them
with open('songs.txt', 'w') as f:
    f.write("Song\t\tCount\n")
    for (song, count) in counter:
        print(song, '\t\t', count)
        f.write('{}\t\t{}\n'.format(song, count))

# The 'with' statement ensures the file is closed after the block is executed