In [25]:
import pyspark as ps
from collections import Counter
import pandas as pd
import time
import matplotlib.pyplot as plt
import os
import random

from data_prep import parse_line

spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("nathanscope") 
        .getOrCreate()
        )

In [2]:
sc = spark.sparkContext

In [3]:
col_names = ['marketplace','customer_id','review_id','product_id','product_parent','product_title','product_category','star_rating','helpful_votes','total_votes','vine','verified_purchase','review_headline','review_body','review_date']
cols = {}
for i in range(len(col_names)):
    print (str(i)+': '+col_names[i])
    cols[col_names[i]] = i 

0: marketplace
1: customer_id
2: review_id
3: product_id
4: product_parent
5: product_title
6: product_category
7: star_rating
8: helpful_votes
9: total_votes
10: vine
11: verified_purchase
12: review_headline
13: review_body
14: review_date


In [4]:
def casting_function(row):
    if row[0] == 'marketplace':
        return ()
    if len(row)!=15:
        return(len(row))
    (marketplace, customer_id, review_id, product_id, product_parent, product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date) = row
    
    
    return (marketplace, str(customer_id), str(review_id), str(product_id), product_parent, str(product_title),str(product_category),int(star_rating),int(helpful_votes),int(total_votes),vine,verified_purchase,str(review_headline),str(review_body),review_date)




In [5]:
def time_cast(row):
    pattern = '%Y-%m-%d'
    row =list(row)
    row[14] = int(time.mktime(time.strptime(row[14], pattern)))/86400
    return tuple(row)

In [6]:
def toCSVLine(data):
  return ','.join(str(d) for d in data)

In [7]:
file = open("data/S3_Repos.txt","r+")
repos = []
for line in file.readlines():
    line = line.replace('https://s3.amazonaws.com/amazon-reviews-pds/tsv/','')
    line = line.replace('\n','')
    repos.append('s3://amazon-reviews-pds/tsv/'+line)

In [8]:
repos[1]

's3://amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz'

In [9]:
rdd_books = (sc.textFile(repos[1])
            .map(lambda rowstr : rowstr.split("\t"))
            .map(casting_function)
           )

In [10]:
rdd_books =rdd_books.filter(lambda x: len(x)==15).map(time_cast)

In [11]:
rdd_sample = rdd_books.filter(lambda x: x[7]==1).filter(lambda x: x[9]>5)
#rdd_sample = rdd_books.filter(lambda x: x[7]==1)


star_counts = rdd_books.map(lambda x: Counter({x[cols['star_rating']]:1})).reduce(lambda x, y: x+y)

rdd_sample.count()

In [12]:
tiny_df = pd.DataFrame(rdd_sample.take(10000))
tiny_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
0,US,18949762,R12U0031MS8AZU,B0122MC0BA,267006201,LNI Fashion Stainless Steel Band Watch Mk,Watches,1,6,8,N,Y,Four Stars,"When I first saw the watch, I was so happy not...",16678.0
1,US,21005490,R3JFUP4ZFPHLZ8,B002SSUQFG,354933056,Seiko Men's SNK809 Seiko 5 Automatic Stainless...,Watches,1,0,6,N,N,This watch will not allow for second hand sett...,This watch will not allow for second hand sett...,16678.0
2,US,45416451,R29WC4B3ACC3RZ,B004JY0KP8,815774729,Seiko Men's SNDC31 Classic Stainless Steel Chr...,Watches,1,2,8,N,Y,One Star,Broke in two weeks,16678.0
3,US,14870011,R6F9VY91ADPLA,B002ZLE41I,118278818,Grand Seiko Wristwatch Sbgx061 Mens,Watches,1,8,30,N,N,NOT a GS,NOT a Grand Seiko - do not buy at this price.<...,16678.0
4,US,6402475,RONNVK5TAN4U6,B00KMAKALC,600186626,Seiko Men's SNE335 Dress Solar Analog Display ...,Watches,1,1,6,N,Y,One Star,To big and bulky for a small arm. RETURNED,16676.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,US,13320587,R1MK97RMOMP843,B0006AAS5Q,975113005,Invicta Men's 9212 Speedway Analog Japanese Qu...,Watches,1,5,11,N,Y,Looks good - but deffective.,Watch came deffective. Chronograph functions a...,14549.0
9996,US,13849268,R3EX9UKKMIJ4AZ,B000BDDF4Y,799604616,Seiko Men's Two-Tone Blue Dial Perpetual Calen...,Watches,1,4,9,N,Y,Poor Quality,"After wearing this just a couple months, the w...",14548.0
9997,US,36039892,R2YTVMRUJSZFQE,B001RNOBDG,921932490,Timex Unisex T5K242 1440 Sports Digital Sport ...,Watches,1,12,17,N,Y,Timex Midsize 1440 Sports Digital Sport Resin ...,"It does not say it but it is a women's watch, ...",14547.0
9998,US,41636887,R2BZHK7U09MFGF,B002IAXAAW,552184232,Android Men's AD430BKK Hydraumatic Chronograph...,Watches,1,1,7,N,Y,Hydraumatic - too Sharp - ouch!!!,This is a nice design but the openings in the ...,14547.0


In [13]:
tiny_df.to_csv('data/size_check.csv')

In [14]:
tenK_size = os.path.getsize('data/size_check.csv')

In [15]:
total_rows = rdd_sample.count()
total_rows

11402

In [16]:
optimal_bytes = 90000000
tenK_scale = optimal_bytes/tenK_size
optimal_rows = 10000*tenK_scale


In [17]:
if total_rows <optimal_rows:
    temp = pd.DataFrame(rdd_sample.collect())
else:
    scale = optimal_rows/total_rows
    rdd_sample = rdd_sample.filter(lambda x: random.random() < scale)

In [18]:
total_rows = rdd_sample.count()
total_rows

11402

In [19]:
optimal_rows

131685.21965094638

In [20]:
total_rows

11402

In [29]:
def make_parse(row):
    row = list(row)
    row.append(parse_line(row[13]))
    return tuple(row)

In [30]:
test = rdd_sample.map(make_parse)

In [31]:
pd.DataFrame(test.take(10))

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
0,US,18949762,R12U0031MS8AZU,B0122MC0BA,267006201,LNI Fashion Stainless Steel Band Watch Mk,Watches,1,6,8,N,Y,Four Stars,"When I first saw the watch, I was so happy not...",16678.0,first saw watch happy discover watch work disa...
1,US,21005490,R3JFUP4ZFPHLZ8,B002SSUQFG,354933056,Seiko Men's SNK809 Seiko 5 Automatic Stainless...,Watches,1,0,6,N,N,This watch will not allow for second hand sett...,This watch will not allow for second hand sett...,16678.0,watch allow second hand set two watch send cou...
2,US,45416451,R29WC4B3ACC3RZ,B004JY0KP8,815774729,Seiko Men's SNDC31 Classic Stainless Steel Chr...,Watches,1,2,8,N,Y,One Star,Broke in two weeks,16678.0,broke two week
3,US,14870011,R6F9VY91ADPLA,B002ZLE41I,118278818,Grand Seiko Wristwatch Sbgx061 Mens,Watches,1,8,30,N,N,NOT a GS,NOT a Grand Seiko - do not buy at this price.<...,16678.0,grand seiko buy br br know poster realizes wat...
4,US,6402475,RONNVK5TAN4U6,B00KMAKALC,600186626,Seiko Men's SNE335 Dress Solar Analog Display ...,Watches,1,1,6,N,Y,One Star,To big and bulky for a small arm. RETURNED,16676.0,big bulky small arm return
5,US,43970363,R20YTVNH28TOAI,B00PGPDUC4,394590494,Invicta Men's 16198 Russian Diver Analog Displ...,Watches,1,3,7,N,N,One Star,LOL &#34;Adventurously designed&#34;? Jesus Ch...,16676.0,lol adventurously design jesus christ thing ugly
6,US,139091,R3U9CP7W34OM8Z,B010EA7R5G,606372980,Invicta Men's 20303 Speedway Analog Display Ja...,Watches,1,0,7,N,Y,One Star,Watch is junk I sent it Back,16676.0,watch junk send back
7,US,225889,R2D8IMBVX3XCLF,B00U0XC1T0,31905742,Akribos XXIV Men's AK813YG Round Yellow Gold ...,Watches,1,14,20,N,Y,I bought in 30 june. But i just use ...,I bought in 30 june. But i just use watches in...,16676.0,buy june use watch july final becam water n st...
8,US,6353050,R1KHROYDLLS1ZB,B00O4UPK98,798202029,Nautica Men's NAD19518G NST101 Stainless Steel...,Watches,1,1,7,N,Y,It is my first Nautica and truly I did not lik...,This s watch is too big for my taste. It is my...,16675.0,watch big taste first nautica truly like quali...
9,US,49562841,R3K8S9R942KE9C,B001A5LKAS,616234998,Casio Men's WV58A-1AV,Watches,1,15,15,N,Y,Beware -- shipping with nearly-dead batteries,I've had several Casio watches with good exper...,16675.0,several casio watch good experience several ye...


In [None]:
temp.to_csv('data/books_sample_1star_10vote.csv',index=False)

In [None]:
new_sample = pd.read_csv('data/books_sample_1star_10vote.csv')

In [None]:
repos