In [1]:
import findspark
findspark.init() 
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import *
import time

In [2]:
from pyspark.sql import SparkSession
# starting SparkSession
spark = SparkSession \
        .builder \
        .appName("RDD") \
        .config("spark.ui.port","4041") \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", "2") \
        .config("spark.task.maxFailures", "8") \
        .getOrCreate()

# Read csvs files with inferschema variable

In [3]:
import csv
sc = spark.sparkContext

# Read artist.csv
artists = sc.textFile("hdfs://master:9000/files/artists.csv")\
            .map(lambda x : list(csv.reader([x], delimiter=',', quotechar='"'))[0])

# Read chart_artist_mapping.csv
chart_artist_mapping = sc.textFile("hdfs://master:9000/files/chart_artist_mapping.csv")\
                         .map(lambda x : list(csv.reader([x], delimiter=',', quotechar='"'))[0])

# Read charts.csv
charts = sc.textFile("hdfs://master:9000/files/charts.csv")\
           .map(lambda x : list(csv.reader([x], delimiter=',', quotechar='"'))[0])

# Read regions.csv
regions = sc.textFile("hdfs://master:9000/files/regions.csv")\
            .map(lambda x : list(csv.reader([x], delimiter=',', quotechar='"'))[0])

In [4]:
from operator import add
def toCSVLine(data):
    return ','.join(str(d) for d in data)

# Method SparkSQL with CSV file as input file

## Q1: Ποιο είναι το συνολικό πλήθος των streams που έχουν καταγραφεί για το τραγούδι με τίτλο “Shape of You”, σύμφωνα με τα top200 charts?
Ως αποτελέσμα να δωθεί μόνο ένας αριθμός με το πλήθος

In [None]:
q1_rdd = charts.filter(lambda x: x[1] == "Shape of You" and x[5] == "top200")\
               .map(lambda x: (1, (int(x[7]))))\
               .reduceByKey(lambda x, y : x + y)

In [None]:
start_time = time.time()
q1_rdd.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q1_rdd')
print("Execution Time for q1(rdd) is: ",time.time() - start_time)

## Q2: Για κάθε chart, να βρεθεί το τραγούδι με τον μεγαλύτερο μέσο χρόνο παραμονής (δείτε «Υποδείξεις») στην πρώτη θέση.
Ως αποτέλεσμα, αναμένονται δύο γραμμές, μία για κάθε chart στην μορφή:
όνομα_chart, όνομα_τραγουδιού, μέσος_χρόνος_παραμονής_θέση#1
Αναμενόμενο αποτέλεσμα στο viral50 chart viral50,Calma - Remix,24.985507

In [None]:
# Step 1: filtering over the case of rank == 1
# Step 2: tuple of (chart(x[5]), song_title(x[1]), 1) 
# Step 3: count over same keys 
# Step 4: find the average time -> tuple of (chart, song_title, avg_time)
# Step 5: convert the original RDD to a pair-RDD 
# Step 6: find the element with max(count) for chart

q2_rdd = charts.filter(lambda x: x[2] == '1')\
               .map(lambda x: ((x[5], x[1]), 1))\
               .reduceByKey(lambda x, y: x + y)

In [None]:
q2_rdd_max = q2_rdd.map(lambda x: (x[0][0], x[0][1], x[1]/69))\
                   .keyBy(lambda x: x[0])\
                   .reduceByKey(lambda x, y: x if x[2] >= y[2] else y)\
                   .values()

In [None]:
start_time = time.time()
q2_rdd_max.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q2_rdd')
print("Execution Time for q2(rdd) is: ",time.time() - start_time)

## Q3: Από τα top200 charts, να βρεθεί για κάθε μήνα της κάθε χρονιάς, το μέσο ημερήσιο πλήθος streams του τραγουδιού που βρίσκεται στην θέση 1 (δείτε «Υποδείξεις»), ταξινομημένα ως προς την χρονιά και τον μήνα.

In [None]:
q3_filtered = charts.filter(lambda x: x[2] == '1' and x[5] == 'top200')

In [None]:
q3_rdd_dist_count = q3_filtered.map(lambda x: ((x[3].split("T")[0][0:4], x[3].split("T")[0][5:7], x[3].split("T")[0][8:]), 1))\
                               .reduceByKey(lambda x, y: x + y)\
                               .map(lambda x: ((x[0][0], x[0][1]), 1))\
                               .reduceByKey(lambda x, y: x + y)

In [None]:
q3_rdd_sum = q3_filtered.map(lambda x: ((x[3].split("T")[0][0:4], x[3].split("T")[0][5:7]), int(x[7])))\
                        .reduceByKey(lambda x, y: x + y)

In [None]:
q3_rdd_joined = q3_rdd_dist_count.join(q3_rdd_sum)\
                                 .map(lambda x: (x[0], x[1][1]/x[1][0]))\
                                 .sortByKey()

In [None]:
start_time = time.time()
q3_rdd_joined.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q3_rdd')
print("Execution Time for q3(rdd) is: ",time.time() - start_time)

## Q4: Από τα viral50 charts, βρείτε για κάθε χώρα το (ή τα σε περίπτωση ισοψηφίας) τραγούδια με το μεγαλύτερο πλήθος παραμονής στο charts. Ταξινομείστε τα αποτελέσματα σας ως προς το όνομα της χώρας και το όνομα του τραγουδιού. Ως αποτέλεσμα δώστε μία γραμμή για κάθε τραγούδι κάθε χώρας στην μορφή :
χώρα, id_τραγουδιού, όνομα_τραγουδιού, πλήθος_παραμονής_στο_viral50

In [15]:
from typing import Tuple, List

def myFunc(x, y):
    if x[3] > y[3]:
        return x
    elif x[3] == y[3]: 
        return x + y
    else:
        return y

def processor(x):
    if len(x) > 4:
        conv_to_list = list(x)
        times = len(conv_to_list) // 4
        t = [conv_to_list[((len(conv_to_list) - 4)*start):(start+1)*(len(conv_to_list) - 4):] for start in range(times)]
        return tuple((v[0], v[1], v[2], v[3]) for v in t)
    else:
        return (x[0], (x[1][0], x[1][1], x[1][2], x[1][3]))

q4_rdd = charts.filter(lambda x: x[5] == "viral50")\
               .map(lambda x: ((x[4], x[0], x[1]), 1))\
               .reduceByKey(lambda x, y: x + y)\
               .map(lambda x: (x[0][0], x[0][1], x[0][2], x[1]))\
               .keyBy(lambda x: x[0])\
               .reduceByKey(myFunc)\
               .map(lambda x: (x[1][3], (x[1][0], x[1][1], x[1][2])))\
               .sortByKey(ascending=False)\
               .map(lambda x: (x[1][0], (x[1][1], x[1][2], x[0])))\
               .join(regions)\
               .map(lambda x: ((x[1][1], x[1][0][1]), (x[1][0][0], x[1][0][2])))\
               .sortByKey(ascending=True)\
               .map(lambda x: (x[0][0], x[1][0], x[0][1], x[1][1]))
                
#.flatMap(lambda x: map(lambda y: y, processor(x)))\

In [16]:
start_time = time.time()
q4_rdd.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q4_rdd')
print("Execution Time for q5(rdd) is: ",time.time() - start_time)

Execution Time for q5(rdd) is:  2.2607812881469727


## Q5:Σύμφωνα με τα top200, βρείτε σε κάθε χρονιά τον καλλιτέχνη με το μεγαλύτερο μέσο πλήθος streams. Ταξινομείστε ως προς τη χρονιά.
Ως αποτελέσμα, δώστε για κάθε χρονιά μία γραμμή στην εξής μορφή
χρονιά, όνομα_καλλιτέχνη, μέσο_πλήθος_streams

In [17]:
q5_rdd = charts.filter(lambda x: x[5] == "top200")\
               .map(lambda x: (x[0], (x[3].split("T")[0].split("-")[0], int(x[7]))))\
               .leftOuterJoin(chart_artist_mapping)\
               .map(lambda x: (x[1][1], (x[0], x[1][0][0], x[1][0][1])))\
               .join(artists)\
               .map(lambda x: ((x[1][0][1], x[0], x[1][1]), x[1][0][2]))\
               .reduceByKey(lambda x, y: x + y)\
               .map(lambda x: (x[0][0], x[0][2], x[1]/69))\
               .keyBy(lambda x: x[0])\
               .reduceByKey(lambda x,y: x if x[2] >= y[2] else y)\
               .values()

In [18]:
start_time = time.time()
q5_rdd.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q5_rdd')
print("Execution Time for q5(rdd) is: ",time.time() - start_time)

Execution Time for q5(rdd) is:  233.56186985969543


## Q6: Για την Ελλάδα, βρείτε για κάθε χρονιά και chart τον καλλιτέχνη (ή τους καλλιτέχνες) που έχει (έχουν) παραμείνει διαδοχικές ημέρες περισσότερες φορές στο #1 κάποιο από τα τραγούδια του. Ταξινομείστε ως προς το chart και τη χρονιά.
Ως αποτελέσμα δώστε μία γραμμή για κάθε καλλιτέχνη κάθε χρονιάς στη μορφή: όνομα_chart, χρονιά, όνομα καλλιτέχνη, ημέρες_διαδοχικής_παραμονής_#1

In [None]:
q6_rdd = charts.filter(lambda x: x[2] == '1' and x[6] == 'SAME_POSITION' and x[4] == '23')\
               .map(lambda x: ((x[5], x[3].split("-")[0], x[0]), 1))\
               .reduceByKey(lambda x, y: x + y)\
               .map(lambda x: ((x[0][0], x[0][1], x[1]), x[0][2]))

In [None]:
q6_rdd_max = q6_rdd.map(lambda x: ((x[0][0], x[0][1]), x[0][2]))\
                    .reduceByKey(lambda x, y: x if x >= y else y)\
                    .map(lambda x: ((x[0][0], x[0][1], x[1]), 1))

In [None]:
q6_rdd_joined = q6_rdd.join(q6_rdd_max)\
                      .map(lambda x: ((x[1][0]), (x[0][0], x[0][1], x[0][2])))\
                      .join(chart_artist_mapping)\
                      .map(lambda x: (x[1][1], (x[1][0][0], x[1][0][1], x[1][0][2])))\
                      .join(artists)\
                      .map(lambda x: (x[1][0][0], x[1][0][1], x[1][1], x[1][0][2]))\
                      .sortBy(lambda x: x[0], ascending=False)

In [None]:
start_time = time.time()
q6_rdd_joined.map(toCSVLine).coalesce(1).saveAsTextFile('hdfs://master:9000/outputs/q6_rdd')
print("Execution Time for q6(rdd) is: ",time.time() - start_time)

In [None]:
# for x in q6_rdd_joined.take(100):
#     print(x)

In [None]:
spark.stop()