In [1]:
# Import all necessary libraries and setup the environment for matplotlib
%matplotlib inline
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, FloatType
from pyspark.sql.functions import udf

import numpy as np
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession \
    .builder \
    .appName("Assignment2") \
    .getOrCreate()

wireless_data = "../amazon_reviews_us_Wireless_v1_00.tsv.gz"
wireless = spark.read.csv(wireless_data,header=True, sep='\t')

In [3]:
wireless.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [4]:
wireless.head()

Row(marketplace='US', customer_id='16414143', review_id='R3W4P9UBGNGH1U', product_id='B00YL0EKWE', product_parent='852431543', product_title='LG G4 Case Hard Transparent Slim Clear Cover for LG G4', product_category='Wireless', star_rating='2', helpful_votes='1', total_votes='3', vine='N', verified_purchase='Y', review_headline='Looks good, functions meh', review_body="2 issues  -  Once I turned on the circle apps and installed this case,  my battery drained twice as fast as usual.  I ended up turning off the circle apps, which kind of makes the case just a case...  with a hole in it.  Second,  the wireless charging doesn't work.  I have a Motorola 360 watch and a Qi charging pad. The watch charges fine but this case doesn't. But hey, it looks nice.", review_date='2015-08-31')

In [13]:
NumOfReviews = wireless.count()

9002021

In [14]:
wireless.select('customer_id').distinct().count()

5197905

In [15]:
wireless.select('product_id').distinct().count()

906592

In [33]:
customer_review_count = wireless.groupby('customer_id').count().orderBy('count', ascending=False)
customer_review_count.first()

Row(customer_id='30208851', count=616)

In [34]:
customer_review_count.show(10)

+-----------+-----+
|customer_id|count|
+-----------+-----+
|   30208851|  616|
|   53037408|  493|
|   45070473|  437|
|   49266466|  435|
|   32038204|  427|
|   52870270|  353|
|   44834233|  330|
|   11995502|  319|
|   51346302|  318|
|   43856165|  316|
+-----------+-----+
only showing top 10 rows



In [38]:
user_count_median = wireless.groupby('customer_id').count() \
                                .approxQuantile(col='count', probabilities=[0.5], relativeError=0)
user_count_median

[1.0]

In [35]:
product_review_count = wireless.groupby('product_id').count().orderBy('count', ascending=False)
product_review_count.first()

Row(product_id='B009A5204K', count=10270)

In [36]:
product_review_count.show(10)

+----------+-----+
|product_id|count|
+----------+-----+
|B009A5204K|10270|
|B0073FE1F0| 9946|
|B007FHX9OK| 9468|
|B0042FV2SI| 8866|
|B009SYZ8OC| 8801|
|B009USAJCC| 8055|
|B005X1Y7I2| 8023|
|B002BBJMO6| 7864|
|B0093QER4C| 7112|
|B003PPGOC0| 7087|
+----------+-----+
only showing top 10 rows



In [39]:
product_count_median = wireless.groupby('product_id').count() \
                        .approxQuantile(col='count', probabilities=[0.5], relativeError=0)
product_count_median

[1.0]

In [4]:
import nltk
def sent_TokenizeFunct(x):
    if(not x):
        sentences = ''
    else:
        sentences=x.lower()
        sentences = nltk.sent_tokenize(sentences)
    return len(sentences)

In [5]:
slen = udf(sent_TokenizeFunct, IntegerType())

In [8]:
wireless_with_TokenLength = wireless.withColumn("TokenLength", slen("review_body"))
filtered_wireles = wireless_with_TokenLength.filter("TokenLength> 1 AND ")
filtered_wireles.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|TokenLength|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+-----------+
|         US|   16414143|R3W4P9UBGNGH1U|B00YL0EKWE|     852431543|LG G4 Case Hard T...|        Wireless|          2|            1|          3|   N|                Y|Looks good, funct...|2 issues  -  Once...| 2015-08-31|          6|
|         US|   50800750|R15V54KBMTQWAY|B00XK95RPQ|     516894650|Selfie

In [46]:
#TokenLength_rdd = sentenceTokenizeRDD.map(lambda x: len(x))
#TokenLength_df = TokenLength_rdd.map(lambda x: (x,)).toDF()
#TokenLength_df.select("_1").take(10)


#from pyspark.sql.functions import col
#df3 = wireless.join(TokenLength_df, "id", "outer").drop("id") 
#wireless_with_TokenLength = wireless.crossJoin(TokenLength_df.select("_1"))
#wireless_with_TokenLength.take(5)
#wireless_with_TokenLength = wireless.withColumn('TokenLength', TokenLength_df['_1'])
#wireless_with_TokenLength.take(5)