In [15]:
# from utils import hdf5_getters
import os
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from collections import OrderedDict
from pyspark import SparkFiles
from tables import *
#import pydoop.hdfs as hpath

spark_session = SparkSession\
                .builder\
                .master("spark://192.168.1.16:7077") \
                .appName("Data Analysis")\
                .config("spark.submit.deployMode", "client")\
                .config("spark.dynamicAllocation.enabled", "false")\
                .config("spark.cores.max", "4")\
                .config("spark.dynamicAllocation.enabled", "false")\
                .config("spark.executor.memory", "1g")\
                .getOrCreate()

spark_context = spark_session.sparkContext
spark_context.addPyFile("utils/hdf5_getters.py")
# spark_session.sparkContext.addPyFile("utils/hdf5_getters.py")
sqlContext = SQLContext(spark_session)

In [242]:
# def recursive_file_search(rootDir, songs):
#     for lists in os.listdir(rootDir):
#         path = os.path.join(rootDir, lists)
#         if os.path.isdir(path):
#             recursive_file_search(path, songs)
#         else:
#             songs.append(str(path))

In [253]:
def recursive_file_search(rootDir, songs):
    for lists in rootDir:
        path = hpath.path.join(rootDir, lists)
        if hpath.path.isdir(path):
            recursive_file_search(path, songs)
        else:
            songs.append(str(path))

In [300]:
def open_h5_file_read(h5filename):
    """
    Open an existing H5 in read mode.
    Same function as in hdf5_utils, here so we avoid one import
    """
    return tables.open_file(h5filename, mode='r')

def get_song_id(h5,songidx=0):
    """
    Get song id from a HDF5 song file, by default the first song in it
    """
    return h5.root.metadata.songs.cols.song_id[songidx]

def get_artist_name(h5,songidx=0):
    """
    Get artist name from a HDF5 song file, by default the first song in it
    """
    return h5.root.metadata.songs.cols.artist_name[songidx]

def get_title(h5,songidx=0):
    """
    Get title from a HDF5 song file, by default the first song in it
    """
    return h5.root.metadata.songs.cols.title[songidx]

def get_loudness(h5,songidx=0):
    """
    Get loudness from a HDF5 song file, by default the first song in it
    """
    return h5.root.analysis.songs.cols.loudness[songidx]

def get_year(h5,songidx=0):
    """
    Get release year from a HDF5 song file, by default the first song in it
    """
    return h5.root.musicbrainz.songs.cols.year[songidx]

def get_tempo(h5,songidx=0):
    """
    Get tempo from a HDF5 song file, by default the first song in it
    """
    return h5.root.analysis.songs.cols.tempo[songidx]

def get_danceability(h5,songidx=0):
    """
    Get danceability from a HDF5 song file, by default the first song in it
    """
    return h5.root.analysis.songs.cols.danceability[songidx]

def get_artist_mbtags(h5,songidx=0):
    """
    Get artist musicbrainz tag array. Takes care of the proper indexing if we are in aggregate
    file. By default, return the array for the first song in the h5 file.
    To get a regular numpy ndarray, cast the result to: numpy.array( )
    """
    if h5.root.musicbrainz.songs.nrows == songidx + 1:
        return h5.root.musicbrainz.artist_mbtags[h5.root.musicbrainz.songs.cols.idx_artist_mbtags[songidx]:]
    return h5.root.musicbrainz.artist_mbtags[h5.root.metadata.songs.cols.idx_artist_mbtags[songidx]:
                                             h5.root.metadata.songs.cols.idx_artist_mbtags[songidx+1]]

In [23]:
# mypath = "/home/ubuntu/A.tar.gz/A/MillionSongSubset/data/A/A"
mypath = "hdfs://192.168.1.16:9000/user/ubuntu/songs_ABC.csv"

songs = []
data_frame = spark_session.read\
    .option('header', 'true')\
    .csv(mypath)\
    .cache()

data_frame.count()

#map(lambda x: x).take(10)
#map(lambda x: hdf5_getters.open_h5_file_read(x)).take(10)

#.map(lambda x: hdf5_getters.get_title(x)).take(5)

# recursive_file_search(mypath, songs)
# songs_rdd = spark_context.parallelize(songs)
# print("All songs in specified directory appended")

311831

In [26]:
data_frame.printSchema()
data_frame.groupby('release year')\
    .agg({ 'tempo': 'mean' })\
    .show()

root
 |-- track id: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- title: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- release year: string (nullable = true)
 |-- danceability: string (nullable = true)

+------------+------------------+
|release year|        avg(tempo)|
+------------+------------------+
|        1953|119.91335714285714|
|        1957|113.82883419689121|
|        1987|126.65779720704309|
|        1956|115.90801081081081|
|        1936|         92.119125|
|        1958|115.24875287356323|
|        1943|118.66366666666666|
|        1972|124.22721928665784|
|        1931|          101.9015|
|        1988|124.12052522421527|
|        1926| 98.41649999999998|
|        1938|127.02999999999999|
|        1932|110.72800000000001|
|        1977| 130.5529619631902|
|        1971|124.80250834597874|
|        1984|129.13781800197827|
|        1982|128.88615700267619

In [None]:
def iterate_songs(song):
        h5 = open_h5_file_read(song)
        track_id = str(get_song_id(h5), "utf-8")
        artist = str(get_artist_name(h5), "utf-8")
        title = str(get_title(h5), "utf-8")
        loudness = float(get_loudness(h5))
        release_year = int(get_year(h5))
        tempo = float(get_tempo(h5))
        danceability = float(get_danceability(h5))

        tags = get_artist_mbtags(h5)
        tags = tags.tolist()
        tags_refined = []
        for tag in tags:
            tags_refined.append(str(tag, "utf-8"))

        h5.close()
        return {'track_id': track_id,
                'artist': artist,
                'title': title,
                'loudness': loudness,
                'release_year': release_year,
                'tempo': tempo,
                'tags': tags_refined}

In [None]:
def convert_to_row(d: dict) -> Row:
   return Row(**OrderedDict(sorted(d.items())))

new_rdd = songs_rdd.map(iterate_songs)

df = new_rdd.map(convert_to_row).toDF()

In [None]:
df.show()

In [None]:
df.registerTempTable("songs")
spark_session.sql("select avg(tempo), release_year from songs group by release_year order by avg(tempo) desc").show()

In [255]:
spark_context.stop()