In [1]:
# Example EMR Notebook for doing Kmeans clustering on Wine Reviews
# import necessary libraries
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark import SQLContext
from pyspark.sql.functions import col, udf

from itertools import chain
import collections

# ML stuff
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, StopWordsRemover, NGram, RegexTokenizer, StringIndexer, VectorIndexer
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# concatenate function workaround. 
# concat still has no support for arrays in 2.3.2
def concat(type):
    def concat_(*args):
        return list(chain.from_iterable((arg if arg else [] for arg in args)))
    return udf(concat_, ArrayType(type))
concat_string_arrays = concat(StringType())

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1652845796003_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

aws-cfn-bootstrap (2.0)
beautifulsoup4 (4.9.3)
boto (2.49.0)
click (8.0.3)
docutils (0.14)
jmespath (0.10.0)
joblib (1.1.0)
lockfile (0.11.0)
lxml (4.6.3)
mysqlclient (1.4.2)
nltk (3.6.5)
nose (1.3.4)
numpy (1.16.5)
pip (9.0.1)
py-dateutil (2.2)
pystache (0.5.4)
python-daemon (2.2.3)
python37-sagemaker-pyspark (1.4.1)
pytz (2021.3)
PyYAML (5.4.1)
regex (2021.10.21)
setuptools (28.8.0)
simplejson (3.2.0)
six (1.13.0)
tqdm (4.62.3)
wheel (0.29.0)
windmill (1.6)

You are using pip version 9.0.1, however version 22.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

In [3]:
# create spark sql context
sqlContext = SQLContext(sc)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Define the schema of your data. Although schema can be implicit, it is good practice 
# to define it if you know what data you're dealing with. If you need a more dynamic
# schema, then you will need to let python implicitly determine the schema.
schema = StructType([
    StructField("id", IntegerType()),
    StructField("country", StringType()),
    StructField("description", StringType()),
    StructField("desgination", StringType()),
    StructField("points", IntegerType()),
    StructField("price", DoubleType()),
    StructField("province", StringType()),
    StructField("region_1", StringType()),
    StructField("region_2", StringType()),
    StructField("taster_twitter_handle", StringType()),
    StructField("title", StringType()),
    StructField("variety", StringType()),
    StructField("winery", StringType())
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Spark uses this method to read CSV files as per the
# parameters set in the method and the defined schema. Using a CSV file as it is partionable. CSV.gz isnt.
# Caching is an optimization technique if the same RDD/DataFrame is called more than once
wineReviews = sqlContext.read.csv("s3://assignment15bucket/winemag-data-130k-v2.csv", sep=',', quote='"', header=True,  multiLine=True, schema=schema).cache() 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
wineReviews.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

129971

In [7]:
# Filter out bad data
filteredWineReviews = wineReviews.filter("id is not null and country is not null and description is not null")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
filteredWineReviews.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

129887

In [9]:
# Creates a User defined function that will count the number of elements in an
# array in a given column.
countTokens = udf(lambda words : len(words), IntegerType())

# Converts string to lowercase, then splits by regex. Denotes matching pattern, not splitting gaps.
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="[\'\w\-]+").setGaps(False)

# Remove stop words. You can also include your own list of stopwords.
remover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(), outputCol="filteredWords") 
# , stopWords=["b"])

# Creates n-grams from tokens. We will only go up to bigrams for this class.
bigram = NGram(n=2, inputCol=remover.getOutputCol(), outputCol="bigrams")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Creates a Spark ML Pipeline that specifies different models that will
# apply at different stages.
comprehensiveTokenizer = Pipeline(stages=[regexTokenizer, remover, bigram])
comprehensiveTokenizerModel = comprehensiveTokenizer.fit(filteredWineReviews)
bigramDataFrame = comprehensiveTokenizerModel.transform(filteredWineReviews)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
bigramDataFrame.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+---------------------+-------------+--------------------+------------------+--------------------+--------------------+--------------------+
| id|  country|         description|         desgination|points|price|         province|           region_1|         region_2|taster_twitter_handle|        title|             variety|            winery|               words|       filteredWords|             bigrams|
+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+---------------------+-------------+--------------------+------------------+--------------------+--------------------+--------------------+
|  0|    Italy|Aromas include tr...|        Vulkà Bianco|    87| null|Sicily & Sardinia|               Etna|             null|        Kerin O’Keefe| @kerinokeefe|Nicosia 2013 Vulk...|       White Blend|

In [12]:
print("Combining unigrams and bigrams")
# Creates the final Dataset. We will only need the words and ngrams to create the final
# list of tokens, and the labels which is points.
finalWords = bigramDataFrame.withColumn("tokens", concat_string_arrays(col("filteredWords"), col("bigrams")))
finalWords.select("description", "tokens").show(22, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Combining unigrams and bigrams
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
# Get the raw count of each of the terms/tokens
cv = CountVectorizer(inputCol="tokens", outputCol="rawFeatures", minDF=2.0) # minDF=2.0 means a token needs to appear at least twice for it to be considered part of the vocabulary

# Get the logarithmically scaled relevance of each term based on the occurence of
# the term in the entire document
idf = IDF(inputCol="rawFeatures", outputCol="features")

# By default, will add the predictions column to the current DF. 
# Targets a column named "features" by default.
kmeans = BisectingKMeans(k=3, seed=4)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
print("Clustering data")
clusteringPipeline = Pipeline(stages=[cv, idf, kmeans])
clusteringPipelineModel = clusteringPipeline.fit(finalWords)
clustered =  clusteringPipelineModel.transform(finalWords)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Clustering data

In [15]:
# Count the number of members per cluster.
clustered.groupBy("prediction").count().orderBy("prediction").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|prediction|count|
+----------+-----+
|         0|23019|
|         1|56897|
|         2|49971|
+----------+-----+

In [16]:
cluster0 = clustered.select("prediction", "variety", "points", "price").filter("prediction = 0").collect()
df = sqlContext.createDataFrame(cluster0, ["prediction", "variety", "points", "price"])
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('s3://assignment15bucket/cluster0.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
cluster1 = clustered.select("prediction", "variety", "points", "price").filter("prediction = 1").collect()
df = sqlContext.createDataFrame(cluster1, ["prediction", "variety", "points", "price"])
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('s3://assignment15bucket/cluster1.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
cluster2 = clustered.select("prediction", "variety", "points", "price").filter("prediction = 2").collect()
df = sqlContext.createDataFrame(cluster2, ["prediction", "variety", "points", "price"])
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('s3://assignment15bucket/cluster2.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
print("Showing all words in the stage 0 vocabulary")
cvModel = clusteringPipelineModel.stages[0]
vocab = cvModel.vocabulary
print(vocab)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Showing all words in the stage 0 vocabulary

In [20]:
# destroy spark context
sc.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…