# DataPreparation-2.2 (for Spark 2.2)
# Spark 2.2 "seems" to abandom spark context, and switch to spark session

### WARNING!! This notebook will take up at least 6GB memory. 
### Kung-hsiang, Huang 11/25/2017

This notebook will store a dataframe containing key name **index** and tf-idfed **song** in parquet. 

In [1]:
from pyspark.sql.types import StringType
from pyspark import SQLContext, SparkContext
from collections import namedtuple
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
import re
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("COMP 4651") \
    .getOrCreate()


In [2]:
# sqlContext = SQLContext(sc)



#compute the length of song
size_ = udf(lambda xs: len(xs), IntegerType())

#filter out null song value  note: filter == where, dunno why do they create two functions...
raw_df = spark.read.json("data/lyrics.json").where( (col('lyrics')).isNotNull() ).filter(size_(col('lyrics')) >0)

# .na.drop(subset=[""])
# raw_df.select(raw_df['lyrics'], F.where((raw_df['index'] == 160))).show()
raw_df
raw_df.show()

+---------------+-----+-----+--------------------+--------------------+----+
|         artist|genre|index|              lyrics|                song|year|
+---------------+-----+-----+--------------------+--------------------+----+
|beyonce-knowles|  Pop|    0|Oh baby, how you ...|           ego-remix|2009|
|beyonce-knowles|  Pop|    1|playin' everythin...|        then-tell-me|2009|
|beyonce-knowles|  Pop|    2|If you search
For...|             honesty|2009|
|beyonce-knowles|  Pop|    3|Oh oh oh I, oh oh...|     you-are-my-rock|2009|
|beyonce-knowles|  Pop|    4|Party the people,...|       black-culture|2009|
|beyonce-knowles|  Pop|    5|I heard
Church be...|all-i-could-do-wa...|2009|
|beyonce-knowles|  Pop|    6|This is just anot...|  once-in-a-lifetime|2009|
|beyonce-knowles|  Pop|    7|Waiting, waiting,...|             waiting|2009|
|beyonce-knowles|  Pop|    8|[Verse 1:]
I read...|           slow-love|2009|
|beyonce-knowles|  Pop|    9|N-n-now, honey
Yo...|why-don-t-you-lov...|2009|

In [3]:
STOPWORDS_PATH = 'data/stopwords.txt'
stopwords = set(spark.read.text(STOPWORDS_PATH).rdd.map(lambda x: x.value).collect())

In [4]:
split_regex = r'\W+'
def tokenize(string):
    """ An implementation of input string tokenization that excludes stopwords
    Args:
        string (str): input string
    Returns:
        list: a list of tokens without stopwords
    """
    
    regexed = [ token.lower() for token in re.split(split_regex, string) if len(token)]
    return  [token for token in regexed if token not in stopwords]#get rid of empty stuff

In [5]:
def tf(tokens):
    """ Compute TF
    Args:
        tokens (list of str): input list of tokens from tokenize
    Returns:
        dictionary: a dictionary of tokens to its TF values
    """
    result = {}
    for token in tokens:
        if token not in result:
            result[token] = 1
        else:
            result[token] += 1
    for key in result:
        result[key] /= float(len(tokens))
    return result

In [6]:
def idfs(corpus):
    """ Compute IDF
    Args:
        corpus (RDD): input corpus
    Returns:
        RDD: a RDD of (token, IDF value)
    """
    N = corpus.map(lambda x:  1).reduce(lambda a,b: a+b) + 0.0
    uniqueTokens = corpus.flatMap(lambda x: [(x[1][i], x[0]) for i in range(len(x[1]))])
    tokenCountPairTuple = uniqueTokens.groupByKey().mapValues(lambda x: list(set(x))).mapValues(lambda x: len(x))
    tokenSumPairTuple = tokenCountPairTuple.reduceByKey(lambda a,b: a+b).mapValues(lambda x: N/x)
    return (tokenSumPairTuple)

In [7]:
def tfidf(tokens, idfs):
    """ Compute TF-IDF
    Args:
        tokens (list of str): input list of tokens from tokenize
        idfs (dictionary): record to IDF value
    Returns:
        dictionary: a dictionary of records to TF-IDF values
    """
    # tf returns a dictionary of token to its tf
    tfs = tf(tokens)
    #multiply the two dictionaries
    tfIdfDict = dict((k, v * idfs[k]) for k, v in tfs.items() if k in idfs)
    return tfIdfDict

In [8]:

# get rid of special character and split, then tokenize
raw_to_token = raw_df.select('index','lyrics').rdd.map(lambda x: (x[0],re.sub(r"[^a-zA-Z0-9]+", ' ', x[1]).split(" ") ))\
.map(lambda x: (x[0],  tokenize(' '.join(x[1])) ))
raw_to_token.take(1)

[(0,
  [u'oh',
   u'baby',
   u'know',
   u'm',
   u'gonna',
   u'cut',
   u'right',
   u'chase',
   u'women',
   u'made',
   u'like',
   u'think',
   u'created',
   u'special',
   u'purpose',
   u'know',
   u'special',
   u'feel',
   u'baby',
   u'let',
   u'get',
   u'lost',
   u'need',
   u'call',
   u'work',
   u'cause',
   u're',
   u'boss',
   u'real',
   u'want',
   u'show',
   u'feel',
   u'consider',
   u'lucky',
   u'big',
   u'deal',
   u'well',
   u'got',
   u'key',
   u'heart',
   u'ain',
   u'gonna',
   u'need',
   u'd',
   u'rather',
   u'open',
   u'body',
   u'show',
   u'secrets',
   u'didn',
   u'know',
   u'inside',
   u'need',
   u'lie',
   u'big',
   u'wide',
   u'strong',
   u'won',
   u'fit',
   u'much',
   u'tough',
   u'talk',
   u'like',
   u'cause',
   u'back',
   u'got',
   u'big',
   u'ego',
   u'huge',
   u'ego',
   u'love',
   u'big',
   u'ego',
   u'much',
   u'walk',
   u'like',
   u'cause',
   u'back',
   u'usually',
   u'm',
   u'humble',
   u'right'

In [9]:
# compute IDFs 
idfsWeights = idfs(raw_to_token).collectAsMap()
# seems that 2.2 doesn't need broadcast??
# idfsBroadcast = spark.broadcast(idfsWeights)

In [10]:
# compute tf-idf value
WeightsRDD = raw_to_token.map(lambda x: (x[0], tfidf(x[1], idfsWeights)))

In [9]:
import numpy as np
#select the top 1000 words as dictionary for vectorization
NUM_FEATURES = 1000
def storeVocablist(raw_to_token):
    
    tokenTF = tf(raw_to_token.flatMap(lambda x: x[1]).collect())
    vocabList = np.array(sorted(tokenTF, key=tokenTF.get, reverse=True)[:NUM_FEATURES ])
    np.save('vocabList',vocabList)
storeVocablist(raw_to_token)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/site-packages/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/site-packages/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 38538)

Traceback (most recent call last):
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35867)
Traceback (most recent call last):
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/site-packages/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connec


----------------------------------------


  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/site-packages/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/home/data/khuangaf/miniconda2/envs/py27/lib/python2.7/site-packages/pyspark/serializers.py", line 577, in read_int
    raise EOFError
EOFError


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:35867)

In [13]:
# store tf-idf to parquet
from pyspark.sql.types import Row
def storeTFIDF(WeightsRDD):
    WeightsRDD.map(lambda x: Row(**{'index': x[0], 'song': x[1]})).toDF().write.parquet("tfidf.parquet")

storeTFIDF(WeightsRDD)

In [23]:
from pyspark.sql.types import Row

def storeTFIDFJSON(WeightsRDD):
    WeightsRDD.map(lambda x: Row(**{'index': x[0], 'song': x[1]})).toDF().write.json('tfidf.json')
storeTFIDFJSON(WeightsRDD)