In [1]:
# To use Spark 1.6.3 in Jupyter Notebook we have to use Python 3.4
!python --version

Python 3.4.5 :: Anaconda custom (64-bit)


In [2]:
# USER to determine spark and dataset directories
import getpass
USER = getpass.getuser()
if USER == 'Tarmo':
    SPARK_DIR = 'C:/Users/Tarmo/Documents/Lausanne/CS-401_applied_data_analysis/spark/spark-1.6.3-bin-hadoop2.6'
else:
    SPARK_DIR = '/home/adam/EPFL_courses/spark-1.6.3-bin-hadoop2.6'
# Add your dirs here

In [3]:
SPARK_DIR

'C:/Users/Tarmo/Documents/Lausanne/CS-401_applied_data_analysis/spark/spark-1.6.3-bin-hadoop2.6'

### Import libraries
#### Spark libraries

In [4]:
import findspark
findspark.init(SPARK_DIR)

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import to_date, unix_timestamp, from_unixtime  #to_timestamp, pyspark 2.2 functiona 

#### The others

In [5]:
import pandas as pd
from scipy import stats
import numpy as np
import json
import re
import json 
import gzip
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

### Initialization of Spark and SQL context

In [6]:
sc = SparkContext()
sqlContext = SQLContext(sc)

sc.version

'1.6.3'

### Loading of dataset and metadata

In [5]:
DATASET_DIR = "/media/adam/B236CB1D36CAE209/Studia/ADA/reviews_Books_5.json"
METADATA_DIR = "/media/adam/B236CB1D36CAE209/Studia/ADA/meta_Books.json"

# Load the dataset and create RDDs
text_file = sc.textFile(DATASET_DIR)
# Convert previously read text file to json DataFrame
df = sqlContext.read.json(text_file)

# Load metadata for dataset and convert it to DataFrame
metadata = sc.textFile(METADATA_DIR)
metadata_df = sqlContext.read.json(metadata)

# Register DataFrames as tables to use those names in SQL-type queries
sqlContext.registerDataFrameAsTable(metadata_df, "metadata")
sqlContext.registerDataFrameAsTable(df, "dataset")

In [None]:
# Number of RDDs
text_file.getNumPartitions()

In [None]:
# Number of records in the dataset
text_file.count()

In [None]:
# Schema of the dataset
df.printSchema()

### Initial data processing

In [None]:
# Bunch of castings, reviewTime: string->date, unixReviewTime: int->timestamp
# We do this to be able to better filter and manipulate the data
df = df.withColumn('unixReviewTime', from_unixtime(df['unixReviewTime']))
df = df.withColumn('reviewTime', to_date(df['unixReviewTime']))
df = df.withColumn('unixReviewTime', df['unixReviewTime'].cast('timestamp'))
df.printSchema()

In [None]:
# Look at couple of records, just to be sure that we obtained what we wanted
df.select("reviewTime", 'reviewText', 'unixReviewTime').take(2)

### Average length of review per day

In [None]:
# Computing an average length of review per day
aTuple = (0, 0)
avg = df.select("reviewTime", 'reviewText').rdd.map(lambda row: (row.reviewTime, len(row.reviewText)))
avg = avg.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
avg = avg.mapValues(lambda v: v[0]/v[1])
avg = avg.collect()

In [None]:
# Number of record
len(avg)

In [None]:
# Processing acquired data using Pandas
avg_len = pd.DataFrame(avg, columns=['Date', 'Avg_length'])
avg_len['Date'] = pd.to_datetime(avg_len['Date'])
avg_len.set_index('Date', inplace=True)
avg_len.sort_index(inplace=True)
avg_len.head()

In [None]:
# Save to file not to compute this one more time
avg_len.to_csv("avg_length_review_by_day.csv")

### Average number of reviews per month

In [None]:
monthly_data = avg_len.groupby(avg_len.index.to_period('M')).mean()
monthly_data.plot(figsize=(20,10))

### Average number of reviews between 2012 and 2013

In [None]:
monthly_data['2012':'2013'].plot(figsize=(20,10))

In [None]:
# Save processed data
monthly_data.to_csv("avg_length_review_by_month.csv")

In [None]:
# (Successful) attempt to filter by timestamp 
tmp = df.rdd.filter(lambda row: row.unixReviewTime > pd.to_datetime('2012-05')
                     and row.unixReviewTime < pd.to_datetime('2013'))
tmp.take(5)

### Number of reviews per each day

In [None]:
# Number of reviews per each day
number_of_reviews = df.rdd.map(lambda row: (row.reviewTime, 1)).reduceByKey(lambda a, b: a+b).collect()

In [None]:
# Processing the data
rev_num = pd.DataFrame(number_of_reviews, columns=['Date', 'Number of reviews'])
rev_num['Date'] = pd.to_datetime(rev_num['Date'])
rev_num.set_index('Date', inplace=True)
rev_num.sort_index(inplace=True)
rev_num.head()

In [None]:
# Save the data not to compute over and over
rev_num.to_csv("number_of_reviews_per_day.csv")

### Number of reviews per month

In [None]:
monthly_data_reviews = rev_num.groupby(rev_num.index.to_period('M')).sum()
monthly_data_reviews.plot(figsize=(20,10))

In [None]:
# Save the processed data
monthly_data_reviews.to_csv("number_of_reviews_per_month.csv")

### Number of reviews per book

In [None]:
number_of_reviews_per_book = df.rdd.map(lambda row: (row.asin, 1)).reduceByKey(lambda a, b: a+b).collect()
len(number_of_reviews_per_book)

In [None]:
# Processing the data using Pandas and saving it to csv file
df_number_of_reviews_per_book = pd.DataFrame(number_of_reviews_per_book, columns=['Book_id', 'Number of reviews'])
df_number_of_reviews_per_book.sort_values('Number of reviews', ascending=False, inplace = True)
df_number_of_reviews_per_book.to_csv("number_of_reviews_per_book.csv", index=False)
df_number_of_reviews_per_book.head(10)

In [None]:
# Histogram of numbers of reviews
plt.hist(np.array(df_number_of_reviews_per_book['Number of reviews'].values), bins = 1000)
plt.show()

In [None]:
# Basic statistics for this variable
stats.describe(df_number_of_reviews_per_book['Number of reviews'].values)

In [None]:
# Boxplot
sns.boxplot(df_number_of_reviews_per_book['Number of reviews'].values)

#### Top 30 books ranking based of number of reviews

In [8]:
top_books = df.rdd.map(lambda row: (row.asin, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda wc: -wc[1]).take(30)

top_books_df = sqlContext.createDataFrame(top_books, ['asin', 'rew_num'])
sqlContext.registerDataFrameAsTable(top_books_df, "top_books")
top_books_df.take(5)

In [15]:
# Join the dataset table with the metadata table to see titles of the most popular books
sqlContext.sql("select t.asin, m.title from metadata m join top_books t on m.asin=t.asin").collect()

[Row(asin='1940026016', title='The Atlantis Gene: A Thriller (The Origin Mystery, Book 1)'),
 Row(asin='0439023483', title='The Hunger Games (The Hunger Games, Book 1)'),
 Row(asin='0316206849', title="The Cuckoo's Calling"),
 Row(asin='1469984202', title='Wool - Omnibus Edition'),
 Row(asin='0141039280', title='The Help'),
 Row(asin='0385537859', title='Inferno'),
 Row(asin='038536315X', title='Sycamore Row'),
 Row(asin='0312853238', title="Ender's Game (The Ender Quintet)"),
 Row(asin='0007444117', title='Allegiant (Divergent, #3)'),
 Row(asin='0316055433', title='The Goldfinch: A Novel (Pulitzer Prize for Fiction)'),
 Row(asin='0007386648', title='Unbroken'),
 Row(asin='030758836X', title='Gone Girl'),
 Row(asin='0345803485', title='Fifty Shades of Grey: Book One of the Fifty Shades Trilogy'),
 Row(asin='0425263924', title='Entwined with You (Crossfire, Book 3)'),
 Row(asin='0857521012', title='The Light Between Oceans'),
 Row(asin='0061950726', title='Orphan Train: A Novel'),
 Row(

In [None]:
sqlContext.sql("select t.asin, m.title from metadata m join top_books t limit 10").collect()

### Two methods of filtering by ids of books

In [None]:
most_reviewed_books_id_top = df_number_of_reviews_per_book[:30]
most_reviewed_top = df.rdd.filter(lambda row: row.asin in list(most_reviewed_books_id_top.Book_id))
            .map(lambda row: (row.asin, row.reviewTime)).collect()

In [None]:
most_reviewed_top_2 = sqlContext.sql("select asin, reviewTime from dataset where asin in " + 
               str(tuple(most_reviewed_books_id_top.Book_id)))

### Processing the data for top30 books

In [None]:
# Processing the data using Pandas
most_reviewed_books_top30_df = pd.DataFrame(most_reviewed_top, columns=['asin', 'reviewTime'])
# Convert to datetime type
most_reviewed_books_top30_df['reviewTime'] = pd.to_datetime(most_reviewed_books_top30_df['reviewTime'])
# Assign number of review to compute the sum
most_reviewed_books_top30_df['Number_of_reviews'] = 1
# Create monthly period for aggregation purpose
most_reviewed_books_top30_df['Year-month'] = most_reviewed_books_top30_df['reviewTime'].dt.to_period('M')
most_reviewed_books_top30_df.head()

In [None]:
# Save data describing number of reviews per day for each book in top30
most_reviewed_books_top30_df.groupby(['asin', 'reviewTime']).sum()
                            .to_csv("number_of_reviews_per_day_top30_books.csv")

In [None]:
# Aggregating the data by month
m_rev_books_by_month = most_reviewed_books_top30_df.groupby(['asin', 'Year-month'], as_index=True).sum()
m_rev_books_by_month.to_csv("number_of_reviews_per_month_top30_books.csv")

### Multi-line plot for top30 books - timeseries of reviews per month

In [None]:
m_rev_books_by_month.unstack(level=0).to_csv("number_of_reviews_per_month_top30_books_UNSTACKED.csv")
m_rev_books_by_month.unstack(level=0).plot(figsize = (20,10))

### Average length and number of reviews per book

In [None]:
aTuple = (0, 0)
avg_len_review = df.select('asin', 'reviewText').rdd.map(lambda row: (row.asin, len(row.reviewText)))
avg_len_review = avg_len_review.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
avg_len_review = avg_len_review.mapValues(lambda v: (v[0]/v[1], v[1]))
avg_len_review = avg_len_review.collect()

In [None]:
# Transformation of the data to be able to load it as a DataFrame 
avg_len_review = [(k, v1, v2) for k, (v1, v2) in avg_len_review]

In [None]:
# Processing and saving to file
avg_len_review_per_book_df = pd.DataFrame(avg_len_review, columns=['Book_id', 'Avg_len', 'number_of_reviews'])
avg_len_review_per_book_df.sort_values(['Avg_len', 'number_of_reviews'], ascending=False, inplace=True)
avg_len_review_per_book_df.to_csv("avg_length_and_number_of_reviews_per_book.csv", index=False)
avg_len_review_per_book_df.head(10)

In [None]:
# Spark context shutdown
sc.stop()