In [None]:
############################################################################################### 
#  general use
############################################################################################### 
import time
from math import pow, floor

############################################################################################### 
#  data access
############################################################################################### 
from google.datalab import Context
import google.datalab.storage as storage

import csv
import urllib2
from StringIO import StringIO



############################################################################################### 
#  PySpark - basic
############################################################################################### 

#####https://docs.databricks.com/spark/latest/spark-sql/udf-in-python.html
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#from pyspark.sql.types import *
from pyspark.sql.functions import struct


############################################################################################### 
#  PySpark - machine learning - http://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html
############################################################################################### 

##### https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.CountVectorizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

##### http://spark.apache.org/docs/2.0.0/api/python/pyspark.ml.html#pyspark.ml.clustering.LDA
from pyspark.ml.clustering import LDA


In [111]:
# import testing space

In [None]:
# list your storage buckets
for bucket in list(storage.Buckets()):
  print bucket

In [None]:
# get project name; just for reference
project = Context.default().project_id

# select the storage bucket object you want
my_bucket = storage.Bucket('machine-learning-backend-storagebucket')
for obj in my_bucket.objects():
  print(obj.key + '   size: ' + str(obj.metadata.size) )

In [None]:
############################################################################################### 
#
#  two ways to read data in from Google cloud storage into python lists
#
############################################################################################### 
# read_stream() - produces a str object
d = my_bucket.object('sas_data/airlines.csv').read_stream()
data = [row for row in csv.reader( StringIO(d) )]
print data[:3]
#del(d)

In [None]:
# urllib2 to read the public link of your gcs object
url = 'https://storage.googleapis.com/machine-learning-backend-storagebucket/sas_data/airlines.csv'
data = [i for i in csv.reader(StringIO(urllib2.urlopen(url).read()))]
print data[:3]

In [84]:
############################################################################################### 
#
#  read data from Google cloud storage into spark
#
###############################################################################################
df = spark.read.csv("gs://machine-learning-backend-storagebucket/sas_data/airlines.csv", header=True)
print 'Spark infers all data types are string when reading from csv: https://github.com/databricks/spark-csv'
print df.dtypes
df.show(5)

Spark infers all data yptes are string when reading from csv: https://github.com/databricks/spark-csv
[('id', 'string'), ('airline', 'string'), ('date', 'string'), ('location', 'string'), ('rating', 'string'), ('cabin', 'string'), ('value', 'string'), ('recommended', 'string'), ('review', 'string')]
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta 

In [85]:
############################################################################################### 
#
#  read data from Google cloud storage into spark
#
###############################################################################################
df = spark.read.csv("gs://machine-learning-backend-storagebucket/sas_data/airlines.csv", header=True, inferSchema=True)
print 'inferSchema requires an additional pass over the data: https://github.com/databricks/spark-csv'
print df.dtypes
df.show(5)

inferSchema requires an additional pass over the data: https://github.com/databricks/spark-csv
[('id', 'int'), ('airline', 'string'), ('date', 'string'), ('location', 'string'), ('rating', 'int'), ('cabin', 'string'), ('value', 'int'), ('recommended', 'string'), ('review', 'string')]
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|
|10005|Delta Air Lines|17-Jun

In [133]:
############################################################################################### 
#
#  start topic analysis
#
###############################################################################################
# calculate number of topics using super secret formula
num_topics = int(floor(pow(df.count(), float(1/2.5))))
print num_topics

15


In [75]:
def list_from_stream(bucket_name, bucket_object_name):
  my_bucket = storage.Bucket(bucket_name)
  d = my_bucket.object(bucket_object_name).read_stream()
  strings = [row for row in csv.reader( StringIO(d) )]
  return [item for sublist in strings for item in sublist]
  
# load stopwords from google storage bucket
stopwords = list_from_stream('machine-learning-backend-storagebucket', 'sas_data/stop_words.txt')
print stopwords[:10]

["'d", "'ll", "'m", "'re", "'s", "'ve", 'a', 'aboard', 'about', 'above']


In [90]:
# clean the text
def cleanup_text(record):
    text  = record[8]
    uid   = record[0]
    words = text.split()
    # Remove stopwords and words under X length
    text_out = [word.lower() for word in words if len(word)>2 and word.lower() not in stopwords]
    return text_out

# utilize user defined function to clean the text
udf_cleantext = udf(cleanup_text , ArrayType(StringType()))
clean_text = df.withColumn("words", udf_cleantext(struct([df[x] for x in df.columns])))

In [91]:
# view the data
clean_text.show(5)

+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+--------------------+
|   id|        airline|     date|location|rating|   cabin|value|recommended|              review|               words|
+-----+---------------+---------+--------+------+--------+-----+-----------+--------------------+--------------------+
|10001|Delta Air Lines|21-Jun-14|Thailand|     7| Economy|    4|        YES|Flew Mar 30 NRT t...|[flew, mar, nrt, ...|
|10002|Delta Air Lines|19-Jun-14|     USA|     0| Economy|    2|         NO|Flight 2463 leavi...|[flight, 2463, le...|
|10003|Delta Air Lines|18-Jun-14|     USA|     0| Economy|    1|         NO|Delta Website fro...|[delta, website, ...|
|10004|Delta Air Lines|17-Jun-14|     USA|     9|Business|    4|        YES|"I just returned ...|[returned, round-...|
|10005|Delta Air Lines|17-Jun-14| Ecuador|     7| Economy|    3|        YES|"Round-trip fligh...|["round-trip, fli...|
+-----+---------------+---------+--------+------

In [138]:
# Term Frequency Vectorization
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 1000, minDF=4)
cvmodel = cv.fit(clean_text)
featurizedData = cvmodel.transform(clean_text)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
 
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData) # TFIDF
# show the transformed data
rescaledData.select('review','words','rawFeatures','features').show(5)

+--------------------+--------------------+--------------------+--------------------+
|              review|               words|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+
|Flew Mar 30 NRT t...|[flew, mar, nrt, ...|(1000,[0,2,11,21,...|(1000,[0,2,11,21,...|
|Flight 2463 leavi...|[flight, 2463, le...|(1000,[0,3,6,7,10...|(1000,[0,3,6,7,10...|
|Delta Website fro...|[delta, website, ...|(1000,[0,2,4,7,12...|(1000,[0,2,4,7,12...|
|"I just returned ...|[returned, round-...|(1000,[0,1,2,3,7,...|(1000,[0,1,2,3,7,...|
|"Round-trip fligh...|["round-trip, fli...|(1000,[0,7,12,16,...|(1000,[0,7,12,16,...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [112]:
# Generate 25 Data-Driven Topics:
# "em" = expectation-maximization 
lda = LDA(k=num_topics, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
 
print ldamodel.isDistributed()
print ldamodel.vocabSize()
 
ldatopics = ldamodel.describeTopics()
# Show Topics
ldatopics.show(15)

True
1000
+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[18, 35, 56, 13, ...|[0.01890493052272...|
|    1|[141, 39, 248, 68...|[0.01303530958012...|
|    2|[106, 48, 217, 64...|[0.01089011198031...|
|    3|[143, 94, 66, 291...|[0.02216471707805...|
|    4|[7, 671, 620, 569...|[0.01129101223822...|
|    5|[84, 25, 13, 29, ...|[0.01197915576566...|
|    6|[122, 167, 536, 6...|[0.01708510087273...|
|    7|[77, 88, 22, 269,...|[0.01454564644437...|
|    8|[410, 276, 23, 73...|[0.01221927595976...|
|    9|[193, 15, 214, 24...|[0.01777650397472...|
|   10|[2, 28, 464, 9, 9...|[0.00928587923362...|
|   11|[287, 5, 205, 8, ...|[0.01677499329144...|
|   12|[47, 40, 30, 172,...|[0.02483340607833...|
|   13|[115, 134, 6, 150...|[0.01489504302793...|
|   14|[62, 249, 7, 486,...|[0.01607533423240...|
+-----+--------------------+--------------------+



In [139]:
# generate topic summary
topic_summary = list()
number_of_terms_per_topic = 6 # max of 10
for row in ldatopics.rdd.map(lambda x: x).collect():
  topic_id = [row.topic]
  topic_indices = row.termIndices[:number_of_terms_per_topic]
  topic_weights = row.termWeights
  terms = [vocab[idx] for idx in topic_indices]
  topic_summary.append( topic_id + terms )

for topic in topic_summary:
  print "Topic " + str(topic[0]+1) + "  Terms: " + str(topic[1:])

Topic 1  Terms: [u'gate', u'connecting', u'later', u'told', u'delayed', u'next']
Topic 2  Terms: [u'excellent', u'airways', u'lounge', u'envoy', u'heathrow', u'coach']
Topic 3  Terms: [u'help', u'people', u'kept', u'daughter', u'plane', u'delta']
Topic 4  Terms: [u'phoenix', u'san', u'southwest', u'online', u'diego', u'upgrade']
Topic 5  Terms: [u'delta', u'segments', u'son', u'eat', u'seattle', u'seat']
Topic 6  Terms: [u'agent', u'airport', u'told', u'due', u'mechanical', u'canceled']
Topic 7  Terms: [u'denver', u'phl', u'representative', u'mexico', u'return', u'carry']
Topic 8  Terms: [u'philadelphia', u'aircraft', u'airline', u'charge', u'clt', u'airways']
Topic 9  Terms: [u'tokyo', u'personal', u'good', u'friendly', u'terrible', u'delta']
Topic 10  Terms: [u'overhead', u'seat', u'sfo', u'price', u'check', u'dtw']
Topic 11  Terms: [u'flights', u'boarding', u'boarding.', u'seats', u'2nd', u'plane']
Topic 12  Terms: [u'group', u'first', u'informed', u'united', u'lax', u'people']
Topi

In [None]:
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")