In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
import math

In [2]:
albums = []
albumFile = open('/dbfs/FileStore/tables/dataset_7k_2010-f3f46.txt', 'r')
for line in albumFile:
  albums.append(line.rstrip().split('\t'))
  for x in range(len(albums[-1])):
    if x > 3:
      albums[-1][x] = float(albums[-1][x])
  albums[-1] = [albums[-1][0], albums[-1][1], albums[-1][2], albums[-1][3], [f for f in albums[-1][4:]]]
albumFile.close()

  
albumSchema = StructType([ \
    StructField("AI", StringType()), \
    StructField("AN", StringType()), \
    StructField("TI", StringType()), \
    StructField("TN", StringType()), \
    StructField("FTS", ArrayType(DoubleType()))])

albumDF = sqlContext.createDataFrame(albums, albumSchema)

In [3]:
albumDF.groupBy('AI').agg(collect_list('FTS')).take(1)

In [4]:
mergedAlbumDF = albumDF.groupBy('AI').agg(collect_list('FTS').alias('FTS'))

In [5]:
def interpolateRow(row):
  finalRow = [row.AI, [ [ np.nan for _ in range(13) ] for _ in range(24) ]]
  
  # For each feature type
  for i in range(13):
    
    tempFeatures = []
    
    # Divide case to albums with single track and multiple tracks
    # to avoid 'Division by zero' exception
    # If multiple tracks
    if(len(row.FTS) != 1):
      interSpace = math.floor(((24 - len(row.FTS)) / (len(row.FTS) - 1)))
      additionalSpace = (24 - len(row.FTS)) % (len(row.FTS) - 1)
    
      ## Value and NaN placements
      # For each list of track features
      for j in row.FTS:
        tempFeatures.append(j[i])
        for s in range(interSpace):
          tempFeatures.append(np.nan)
        if additionalSpace > 0:
          tempFeatures.append(np.nan)
          additionalSpace -= 1
    
    # If single track
    else:
      tempFeatures.append(row.FTS[0][i])
      for n in range(23):
        tempFeatures.append(np.nan)
    
    ## Interpolate
    tempFeatures = pd.Series(tempFeatures)
    tempFeatures = tempFeatures.interpolate()
    
    ## Assign Back
    for j in range(24):
      finalRow[1][j][i] = tempFeatures[j]
  
  return finalRow


finalAlbums = mergedAlbumDF.filter(size(mergedAlbumDF.FTS) <= 24).rdd.map(interpolateRow).cache()

In [6]:
def stringify(data):
  row = data[0]
  for x in data[1]:
    for y in x:
      row += '\t%f' % y
  return row

In [7]:
%fs rm -r dbfs:/FileStore/tables/2010.tsv

In [8]:
# Save the RDD
finalAlbums.map(stringify).saveAsTextFile('dbfs:/FileStore/tables/2010.tsv')

In [9]:
# Check Output File
sc.textFile('dbfs:/FileStore/tables/2010.tsv').take(3)