In [1]:
import os
import pandas as pd
import numpy as np
import warc
import gzip
import boto
from boto.s3.key import Key
from gzipstream import GzipStreamFile
from mrjob.job import MRJob
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from __future__ import print_function
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors, DenseVector
from pyspark.ml.feature import CountVectorizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit

In [2]:
import pyspark as ps
sc = ps.SparkContext('local[4]')
sqlContext = ps.SQLContext(sc)

In [3]:
ACCESS_KEY = os.environ['AWS_ACCESS_KEY']
SECRET_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
DSI_CSTON_BUCKET = "galvanize.dsi.capstone.alex"
OUT_BUCKET = "s3a://{}:{}@{}".format(ACCESS_KEY, SECRET_KEY, DSI_CSTON_BUCKET)

In [4]:
wet_path_file_name = 'wet_2016_list/2016-Dec-wet.path'
dec_2016_wet_list = sc.textFile("s3a://%s/%s" % (DSI_CSTON_BUCKET, wet_path_file_name))
first_file = dec_2016_wet_list.first()

In [5]:
conn = boto.connect_s3(ACCESS_KEY, SECRET_KEY, host='s3.amazonaws.com')
pds = conn.get_bucket('commoncrawl')
k = Key(pds, first_file)
f = warc.WARCFile(fileobj=GzipStreamFile(k))

In [6]:
recfilepath = "s3a://%s/recfiles/%s.%s" % (DSI_CSTON_BUCKET, first_file[:-3], 'rec')
urlfilepath = "s3a://%s/recfiles/%s.%s" % (DSI_CSTON_BUCKET, first_file[:-3], 'url')

data = []
url = []

for i, document in enumerate(f):
  if document['Content-Type'] != 'text/plain':
    continue
  dat = document.payload.read()
  data.append(str(dat))
  url.append(str(document.url)) 

In [7]:
rec_df = pd.DataFrame(
    {'contents': data,
     'url': url
    })

In [8]:
rec_rdd_df = sqlContext.createDataFrame(rec_df)
rec_rdd_df.printSchema()

root
 |-- contents: string (nullable = true)
 |-- url: string (nullable = true)



In [9]:
def get_site(url):
   if len(url) <= 3: return ''
   site = url.split('/')[2]
   return site

udf_url_to_website = udf(get_site, StringType())
rec_rdd_df = rec_rdd_df.withColumn("website", udf_url_to_website("url"))
rec_rdd_df.show()

+--------------------+--------------------+--------------------+
|            contents|                 url|             website|
+--------------------+--------------------+--------------------+
|çç
çç	ç...|http://007zhenren...|007zhenrenyuleche...|
|æ¿æ©ç´ é£â§ç¶...|http://0289646723...|0289646723.tranew...|
|ÐÐ¾Ð¿ÑÐ¾ÑÑ Ð¿...|http://03online.c...|        03online.com|
|Ð Ð²Ð¸Ð±ÑÐ°ÑÐ¸...|http://03online.c...|        03online.com|
|ÐÐµÑÐµÐ»Ð¾Ð¼ Ð³...|http://03online.c...|        03online.com|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|
|Ð ÐµÐ±ÐµÐ½Ð¾Ðº Ñ...|http://03online.c...|        03online.com|
|Ð¨ÑÐ¼ Ð² Ð¿ÑÐ°Ð...|http://03online.c...|        03online.com|
|Ð§Ð¸ÑÑÑÐ¹ ÐºÐ»...|http://08.od.ua/b...|            08.od.ua|
|Ð£ÐºÑÐ¾Ð¿ÑÑÐ¾Ñ...|http://08.od.ua/k...|            08.od.ua|
|ÐÐ° Ð¾Ð»ÑÐ³Ð¸Ðµ...|http://08.od.ua/n...|            08.od.ua|
|Orcead, Ð¾Ð¾Ð¾ Ð²...|htt

In [10]:
#rec_rdd_df.rdd.first()

## Sample Case

In [10]:
data = sc.textFile("s3a://galvanize.dsi.capstone.alex/lda_data/sample_lda_data.txt")

In [11]:
data.collect()

[u'1 2 6 0 2 3 1 1 0 0 3',
 u'1 3 0 1 3 0 0 2 0 0 1',
 u'1 4 1 0 0 4 9 0 1 2 0',
 u'2 1 0 3 0 0 5 0 2 3 9',
 u'3 1 1 9 3 0 2 0 0 1 3',
 u'4 2 0 3 4 5 1 1 1 4 0',
 u'2 1 0 3 0 0 5 0 2 2 9',
 u'1 1 1 9 2 1 2 0 0 1 3',
 u'4 4 0 3 4 2 1 3 0 0 0',
 u'2 8 2 0 3 0 2 0 2 7 2',
 u'1 1 1 9 0 2 2 0 0 3 3',
 u'4 1 0 0 4 5 1 3 0 1 0']

In [12]:
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
parsedData.collect()

[DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0]),
 DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]),
 DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0]),
 DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0]),
 DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0]),
 DenseVector([4.0, 2.0, 0.0, 3.0, 4.0, 5.0, 1.0, 1.0, 1.0, 4.0, 0.0]),
 DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 2.0, 9.0]),
 DenseVector([1.0, 1.0, 1.0, 9.0, 2.0, 1.0, 2.0, 0.0, 0.0, 1.0, 3.0]),
 DenseVector([4.0, 4.0, 0.0, 3.0, 4.0, 2.0, 1.0, 3.0, 0.0, 0.0, 0.0]),
 DenseVector([2.0, 8.0, 2.0, 0.0, 3.0, 0.0, 2.0, 0.0, 2.0, 7.0, 2.0]),
 DenseVector([1.0, 1.0, 1.0, 9.0, 0.0, 2.0, 2.0, 0.0, 0.0, 3.0, 3.0]),
 DenseVector([4.0, 1.0, 0.0, 0.0, 4.0, 5.0, 1.0, 3.0, 0.0, 1.0, 0.0])]

In [13]:
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
corpus.take(5)

[[0, DenseVector([1.0, 2.0, 6.0, 0.0, 2.0, 3.0, 1.0, 1.0, 0.0, 0.0, 3.0])],
 [1, DenseVector([1.0, 3.0, 0.0, 1.0, 3.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0])],
 [2, DenseVector([1.0, 4.0, 1.0, 0.0, 0.0, 4.0, 9.0, 0.0, 1.0, 2.0, 0.0])],
 [3, DenseVector([2.0, 1.0, 0.0, 3.0, 0.0, 0.0, 5.0, 0.0, 2.0, 3.0, 9.0])],
 [4, DenseVector([3.0, 1.0, 1.0, 9.0, 3.0, 0.0, 2.0, 0.0, 0.0, 1.0, 3.0])]]

In [24]:
type(corpus.first()[1])

pyspark.mllib.linalg.DenseVector

In [14]:
lda_Model1 = LDA.train(corpus, k=3)

In [15]:
# lda_Model1 = LDA.train(corpus, k=3)
# lda_model2 = LDA.train(corp_test, k=3)
topics = lda_Model1.topicsMatrix()

In [16]:
topics

array([[  3.83969302,  13.04572362,   9.11458336],
       [  3.057827  ,   5.71873325,  20.22343975],
       [  1.89343464,   2.26665728,   7.83990808],
       [ 15.11272915,  18.25108892,   6.63618193],
       [  1.63769315,  14.23069316,   9.13161369],
       [  1.78939897,  12.87412873,   7.3364723 ],
       [ 23.34131878,   3.08342094,   4.57526028],
       [  0.49365112,   7.7192262 ,   1.78712268],
       [  4.96028826,   0.65950383,   2.38020791],
       [  4.78420866,   5.56305796,  13.65273338],
       [ 28.07476574,   2.26011716,   2.6651171 ]])

## Crawl LDA

In [11]:
num_topics = 100
max_iterations = 100
vocab_size = 1000

In [13]:
#doc_idx = np.arange(rec_rdd_df.count())

In [20]:
rec_rdd_df.select(monotonically_increasing_id().alias("rowId"),"*")
rec_rdd_df = rec_rdd_df.withColumn("doc_id", lit(monotonically_increasing_id().alias("rowId")))
rec_rdd_df.show()

+--------------------+--------------------+--------------------+------+
|            contents|                 url|             website|doc_id|
+--------------------+--------------------+--------------------+------+
|çç
çç	ç...|http://007zhenren...|007zhenrenyuleche...|     0|
|æ¿æ©ç´ é£â§ç¶...|http://0289646723...|0289646723.tranew...|     1|
|ÐÐ¾Ð¿ÑÐ¾ÑÑ Ð¿...|http://03online.c...|        03online.com|     2|
|Ð Ð²Ð¸Ð±ÑÐ°ÑÐ¸...|http://03online.c...|        03online.com|     3|
|ÐÐµÑÐµÐ»Ð¾Ð¼ Ð³...|http://03online.c...|        03online.com|     4|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|     5|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|     6|
|Ð ÐµÐ±ÐµÐ½Ð¾Ðº Ñ...|http://03online.c...|        03online.com|     7|
|Ð¨ÑÐ¼ Ð² Ð¿ÑÐ°Ð...|http://03online.c...|        03online.com|     8|
|Ð§Ð¸ÑÑÑÐ¹ ÐºÐ»...|http://08.od.ua/b...|            08.od.ua|     9|
|Ð£ÐºÑÐ¾Ð¿ÑÑÐ¾Ñ...|http://08.od.ua/k...|            08.od.ua|

In [14]:
tokenizer = RegexTokenizer(inputCol="contents", outputCol="tokens")
tokens_df = tokenizer.transform(rec_rdd_df)
tokens_df.show()

+--------------------+--------------------+--------------------+--------------------+
|            contents|                 url|             website|              tokens|
+--------------------+--------------------+--------------------+--------------------+
|çç
çç	ç...|http://007zhenren...|007zhenrenyuleche...|[çç, çç, ...|
|æ¿æ©ç´ é£â§ç¶...|http://0289646723...|0289646723.tranew...|[æ¿æ©ç´ é£â§ç...|
|ÐÐ¾Ð¿ÑÐ¾ÑÑ Ð¿...|http://03online.c...|        03online.com|[ðð¾ð¿ñð¾ññ, ...|
|Ð Ð²Ð¸Ð±ÑÐ°ÑÐ¸...|http://03online.c...|        03online.com|[ð, ð²ð¸ð±ñð°ñ...|
|ÐÐµÑÐµÐ»Ð¾Ð¼ Ð³...|http://03online.c...|        03online.com|[ððµñðµð»ð¾ð¼, ...|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|[ðð¾ñðµð¼ñ, ð½...|
|ÐÐ¾ÑÐµÐ¼Ñ Ð½Ðµ...|http://03online.c...|        03online.com|[ðð¾ñðµð¼ñ, ð½...|
|Ð ÐµÐ±ÐµÐ½Ð¾Ðº Ñ...|http://03online.c...|        03online.com|[ð ðµð±ðµð½ð¾ðº, ...|
|Ð¨ÑÐ¼ Ð² Ð¿ÑÐ°Ð...|http://03online.c...|        03o

## CountVectorizing

In [15]:
#cv_model = CountVectorizer(inputCol="tokens", outputCol="cvectors")
cv_model = CountVectorizer(inputCol="tokens", outputCol="cvectors", vocabSize=vocab_size)

cv_df = cv_model.fit(tokens_df) ## takes long time
mbf = 2.0 / max_iterations + 1.0 / tokens_df.count() #MiniBatchFraction
cvectors = cv_df.transform(tokens_df)

In [16]:
cvectors.printSchema()
cvectors.show()

root
 |-- contents: string (nullable = true)
 |-- url: string (nullable = true)
 |-- website: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cvectors: vector (nullable = true)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|            contents|                 url|             website|              tokens|            cvectors|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|çç
çç	ç...|http://007zhenren...|007zhenrenyuleche...|[çç, çç, ...|(1000,[6,7,10,15,...|
|æ¿æ©ç´ é£â§ç¶...|http://0289646723...|0289646723.tranew...|[æ¿æ©ç´ é£â§ç...|(1000,[7,206],[1....|
|ÐÐ¾Ð¿ÑÐ¾ÑÑ Ð¿...|http://03online.c...|        03online.com|[ðð¾ð¿ñð¾ññ, ...|(1000,[3,22,24,33...|
|Ð Ð²Ð¸Ð±ÑÐ°ÑÐ¸...|http://03online.c...|        03online.com|[ð, ð²ð¸ð±ñð°ñ...|(1000,[3,22,24,33...|
|ÐÐµÑ

In [17]:
from pyspark.sql.types import Row
from pyspark.sql.functions import *

#sv_df = cvectors.select("website", "cvectors")
sv_df = cvectors.select("cvectors")
dv_df = sv_df.rdd.map(lambda x: Row(cvectors=DenseVector(x[0].toArray())))
# dv_df = sv_df.rdd.map(lambda x: Row(cvectors=DenseVector(x[0].toArray()))
#                      if (len(x)>1 and hasattr(x[1], "toArray"))
#                      else Row(website=None, cvectors=DenseVector([])))
#df3 = sqlContext.createDataFrame(rdd)

In [18]:
type(dv_df.take(3)[0][0])
dv_rdd = dv_df.map(lambda x: x[0])

In [19]:
#dv_rdd.first()

In [20]:
corp = dv_rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

In [38]:
corp.first()

[0,
 DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.

In [31]:
#corpus = cvectors.select("cvectors").rdd.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

In [29]:
type(corp)

pyspark.rdd.PipelinedRDD

In [40]:
first = corp.first()

In [41]:
type(first[1])

pyspark.mllib.linalg.DenseVector

In [21]:
## Java Error
crawl_lda_model = LDA.train(corp, k=5)
## EMR Resize

In [None]:
avg_log_likelihood = crawl_lda_model.ldaModel.logLikelihood / documents.count()


In [32]:
crawl_lda_model.describeTopics(maxTermsPerTopic=10)

[([3, 0, 1, 2, 4, 9, 5, 6, 12, 14],
  [0.05816715366103097,
   0.03593556334005401,
   0.02985974783742801,
   0.021764975593566476,
   0.018787736877533403,
   0.016451887234211687,
   0.015168414688729289,
   0.014303760992291354,
   0.01344658687076038,
   0.011983930369742013]),
 ([7, 21, 27, 3, 38, 6, 13, 22, 11, 79],
  [0.0610064939130384,
   0.036315389136648314,
   0.02910561006964736,
   0.017761244065817555,
   0.015727511321842687,
   0.01512840310022241,
   0.013669566021705286,
   0.012924358426367551,
   0.012695828506916992,
   0.012610700283635073]),
 ([8, 3, 24, 32, 33, 37, 11, 5, 25, 49],
  [0.0819338506060203,
   0.05068591747885936,
   0.03173344258674522,
   0.026007241701176086,
   0.02539403484776531,
   0.023715549881750123,
   0.02338085126901994,
   0.019914425767366593,
   0.01899159772290908,
   0.017335130026515457]),
 ([1, 2, 0, 7, 4, 5, 9, 17, 6, 11],
  [0.029638104431959793,
   0.026422606647319827,
   0.023803867376204175,
   0.02205561230990783,
   0.0

In [None]:
cv_df.vocabulary