In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 55.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=9eb7b6016adc8f21979e245e985198bd93a5f67324cb6fb1f9b103ef53718d50
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Succ

In [10]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://hyd123.s3.us-east-2.amazonaws.com/songtitle_popularity.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("songtitle_popularity.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+----------+--------------------+
|popularity|         song_titles|
+----------+--------------------+
| unpopular|        Good for You|
| unpopular|           Body Heat|
| unpopular|             Strings|
| unpopular|          Aftertaste|
| unpopular|                 Air|
| unpopular|               Crazy|
| unpopular|           Survivors|
| unpopular|   A Little Too Much|
| unpopular|           Young God|
| unpopular|             Control|
| unpopular|         Kid In Love|
| unpopular|         Coming Down|
| unpopular|              Colors|
| unpopular|This Is What It T...|
| unpopular|     Me & the Rhythm|
| unpopular|          Camouflage|
| unpopular|               Angel|
| unpopular|      Never Be Alone|
| unpopular|   Life of the Party|
| unpopular|          As You Are|
+----------+--------------------+
only showing top 20 rows



In [12]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['song_titles']))
data_df.show()

+----------+--------------------+------+
|popularity|         song_titles|length|
+----------+--------------------+------+
| unpopular|        Good for You|    12|
| unpopular|           Body Heat|     9|
| unpopular|             Strings|     7|
| unpopular|          Aftertaste|    10|
| unpopular|                 Air|     3|
| unpopular|               Crazy|     5|
| unpopular|           Survivors|     9|
| unpopular|   A Little Too Much|    17|
| unpopular|           Young God|     9|
| unpopular|             Control|     7|
| unpopular|         Kid In Love|    11|
| unpopular|         Coming Down|    11|
| unpopular|              Colors|     6|
| unpopular|This Is What It T...|    21|
| unpopular|     Me & the Rhythm|    15|
| unpopular|          Camouflage|    10|
| unpopular|               Angel|     5|
| unpopular|      Never Be Alone|    14|
| unpopular|   Life of the Party|    17|
| unpopular|          As You Are|    10|
+----------+--------------------+------+
only showing top

### Feature Transformations


In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='popularity',outputCol='label')
tokenizer = Tokenizer(inputCol="song_titles", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [0]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [0]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [17]:
# Show label and resulting features
cleaned.select(['label', 'features']).show(truncate = False)

+-----+------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                        |
+-----+------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |(262145,[16332,113432,252801,262144],[4.817589773939326,4.3067641501733345,2.466214516775848,12.0])                                             |
|1.0  |(262145,[34121,148851,262144],[5.51073695449927,5.916202062607435,9.0])                                                                         |
|1.0  |(262145,[182876,262144],[6.60934924316738,7.0])                                                                                                 |
|1.0  |(262145,[51973,262144],[6.60934924316738,10.0])                            

In [0]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [19]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(10)

+----------+--------------+------+-----+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|popularity|   song_titles|length|label|          token_text|stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+----------+--------------+------+-----+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   popular|        24-Jul|     6|  0.0|            [24-jul]|   [24-jul]|(262144,[11840],[...|(262144,[11840],[...|(262145,[11840,26...|[-79.833612967563...|[0.99999999968946...|       0.0|
|   popular|       7 Rings|     7|  0.0|          [7, rings]| [7, rings]|(262144,[77099,25...|(262144,[77099,25...|(262145,[77099,25...|[-120.01543395637...|[1.0,1.9303252602...|       0.0|
|   popular|       7 Rings|     7|  0.0|          

In [20]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.652366
