In [1]:
#
# Books
#

import pandas as pd
import numpy as np
import scipy as sp
import matplotlib.pyplot as plt
from datetime import datetime

import findspark
findspark.init()
import pyspark

from pyspark.sql.types import DateType
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import *
import pyspark.sql.functions as fn
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql import SQLContext

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

DATA_DIR = 'data/'

<br>
<br>
<br>

In [2]:
### IMPORTING & CLEANING META DATA

meta_products = spark.read.json(DATA_DIR+"meta_Books.json")

In [4]:
meta_products.columns

['_corrupt_record',
 'asin',
 'brand',
 'categories',
 'description',
 'imUrl',
 'price',
 'related',
 'salesRank',
 'title']

In [5]:
# This will extract only the features and turn them into more readable features.
# Features removed : corruptRecord, imURL, related
# This will extract only the features and turn them into more readable features.
# Filter salesRank = None because this will lead to problems for the writing in parquet
# Features removed : corruptRecord, imURL, related
#data_cleaned = meta_products.rdd.flatMap(lambda r: [(r[1], r[2], 'Books', r[4], r[6],  r[9] )])
data_cleaned = meta_products.rdd.flatMap(lambda r: [(r[1], r[4], r[6], r[9] )])

In [6]:
# Define the StructType to define the DataFrame that we'll create with the previously extracted rdd table
schema = StructType([
    StructField("asin", StringType(), True),
    #StructField("brand", StringType(), True),
    #StructField("category", StringType(), True),
    StructField("description", StringType(), True),
    StructField("price", FloatType(), True),
    #StructField("salesRank", IntegerType(), True),
    StructField("title", StringType(), True)
])

In [7]:
# Transform the RDD data into DataFrame (we'll then be able to store it in Parquet)
datacleaned_DF = spark.createDataFrame(data_cleaned, schema=schema)

In [None]:
#Save into parquet to save time in the next times
datacleaned_DF.write.mode('overwrite').parquet("nobrand_meta_Books.parquet")

In [None]:
# Read from the parquet data
datacleaned_DF = spark.read.parquet("meta_Books.parquet")

In [8]:
datacleaned_DF.columns

['asin', 'description', 'price', 'title']

In [9]:
keywords = [" global warming", " solar energy", " recycling ", " pollution ", "solar power", " endangered species", "air pollution", \
" water pollution", " wind energy", " climate change", " wind power", " recycle ", " deforestation", " greenhouse effect", "environment", \
" sustainability ", " natural resources", "alternative energy", " climate ", "global warming", "renewable energy", " ecology", "composting", \
" carbon footprint", " bio ", " biosphere ", " renewable "]

In [10]:
# Filter with title and description not equal to None
# We will then be able to test if those features contains words defined in the keyword vector
# The keyword vector represents the thema that we want : ecology, bio etc...
filter_products_bio = datacleaned_DF.rdd.filter(lambda r: (r[3] != None) &  (r[1] != None)) \
                    .filter(lambda r: (any(word in r[3].lower() for word in keywords)) | (any(word in r[1].lower() for word in keywords)) ) 


In [11]:
# Transform the RDD data into DataFrame (we'll then be able work and join with review data)
DF_filter_products_bio = spark.createDataFrame(filter_products_bio, samplingRatio=0.2)


In [12]:
DF_filter_products_bio

DataFrame[asin: string, description: string, price: double, title: string]

<br>
<br>
<br>

In [None]:
### IMPORTING & CLEANING REVIEWS DATA

reviews = spark.read.json(DATA_DIR+"reviews_Books.json")

In [None]:
reviews.columns

In [None]:
#We remove the review text, reviewer name and summary
reviews_cleaned = reviews.rdd.flatMap(lambda r: [(r[0], r[1], r[2], r[4], r[5],r[8])])

In [None]:
# Define the StructType to define the DataFrame that we'll create with the previously extracted rdd table
schema = StructType([
    StructField("asin", StringType(), True),
    StructField("helpful", ArrayType(IntegerType()), True),
    StructField("overall", FloatType(), True),
    StructField("reviewTime", StringType(), True),
    StructField("reviewerID", StringType(), True),
    StructField("unixReviewTime", StringType(), True),
])

In [None]:
# Transform the RDD data into DataFrame (we'll then be able to store it in Parquet)
reviews_cleaned_DF = spark.createDataFrame(reviews_cleaned, schema=schema)

In [None]:
#Save into parquet to save time in the next times
reviews_cleaned_DF.write.mode('overwrite').parquet("reviews_Books.parquet")

In [13]:
reviews_cleaned_DF = spark.read.parquet("reviews_Books.parquet")

In [14]:
reviews_cleaned_DF

DataFrame[asin: string, helpful: array<int>, overall: float, reviewTime: string, reviewerID: string, unixReviewTime: string]

<br>
<br>
<br>

In [15]:
### Join Reviews and Metadata
review_product_join = DF_filter_products_bio.join(reviews_cleaned_DF, ['asin'])

In [16]:
review_product_join.columns

['asin',
 'description',
 'price',
 'title',
 'helpful',
 'overall',
 'reviewTime',
 'reviewerID',
 'unixReviewTime']

In [None]:
review_product_join = review_product_join.filter(review_product_join.brand.isNotNull())

In [None]:
review_product_join = review_product_join.filter(review_product_join.price.isNotNull())

In [None]:
#from pyspark.sql.functions import isnan, when, count, col

#review_product_join.select([count(when(col(c).isNull(), c)).alias(c) for c in review_product_join.columns]).show()
#review_product_join.select([count(when(isnan(c), c)).alias(c) for c in review_product_join.columns]).show()

In [17]:
review_product_join.write.mode('overwrite').parquet("joined_Books_test.parquet")