In [None]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import databricks.koalas as ks

In [None]:
uri_db = 'mongodb+srv://<username>:<password>@bigdata.toqh2.mongodb.net'
spark_connector_uri = 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.7'

In [None]:
# Create a SparkSession object.
session = SparkSession.builder \
    .master('local') \
    .config('spark.mongodb.input.uri', uri_db) \
    .config('spark.jars.packages', spark_connector_uri) \
    .getOrCreate()

# Get context from SparkSession object.
context = session.sparkContext

In [None]:
# Read data from MongoDB and return two DataFrame objects, one
# for each collection contained in database.
df_reviews = session.read \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option('database', 'test') \
    .option('collection', 'reviews') \
    .load()
df_meta = session.read \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option('database', 'test') \
    .option('collection', 'meta') \
    .load()

# Drop MongoDB _id column in order to avoid error at runtime.
df_reviews = df_reviews.drop('_id')
df_meta = df_meta.drop('_id')

# Print collections schemas.
df_reviews.printSchema()
df_meta.printSchema()

In [None]:
# Create two Koalas DataFrame from the Spark DataFrame objects.
kdf_reviews = ks.DataFrame(df_reviews)
kdf_meta = ks.DataFrame(df_meta)

# Extract sports and outdoors data from salesRank struct.
array_ranks = df_meta.select('salesRank.Sports &amp; Outdoors').to_koalas()

# Allow merge from different DataFrame objects.
ks.set_option('compute.ops_on_diff_frames', True)

# Assign a new column with the array_ranks data extracted above.
kdf_meta['sales_rank_sports_etc'] = array_ranks

# Compute join on asin attribute.
kdf_merge = kdf_reviews.merge(kdf_meta, on='asin')

In [None]:
# Query #1
kdf_1 = kdf_reviews \
    .groupby('asin') \
    .size().alias('reviews_count_product') \
    .sort_values(ascending=False) \
    .reset_index()

kdf_1.head(100).to_csv(path='./to_csv/kdf_1.csv', num_files=1)

print("# I 100 prodotti con il maggior numero di recensioni #")
kdf_1.head(100)

In [None]:
# Query #2
kdf_2 = kdf_reviews \
    .groupby('reviewerID') \
    .size().alias('reviews_count_reviewer') \
    .sort_values(ascending=False) \
    .reset_index()

kdf_2.head(100).to_csv(path='./to_csv/kdf_2.csv', num_files=1)

print("# I 100 reviewer che hanno effettuato il maggior numero di recensioni #")
kdf_2.head(100)

In [None]:
# Query #3
kdf_3 =  kdf_merge \
    [kdf_merge.brand != ''] \
    .dropna(subset=['brand']) \
    .groupby('brand') \
    .size().alias('reviews_count_brand') \
    .sort_values(ascending=False) \
    .reset_index()

kdf_3.head(50).to_csv(path='./to_csv/kdf_3.csv', num_files=1)

print("# Le 50 marche i cui prodotti sono stati maggiormente recensiti #")
kdf_3.head(50)

In [None]:
# Query #4
kdf_4 =  kdf_meta \
     .dropna(subset=['brand', 'price']) \
     .groupby('brand') \
     ['price'] \
     .mean().alias('price_mean') \
     .sort_values(ascending=False) \
     .reset_index()

kdf_4.head(50).to_csv(path='./to_csv/kdf_4.csv', num_files=1)

print("# Le 50 marche i cui prodotti hanno un prezzo medio maggiore #")
kdf_4.head(50)

In [None]:
# Query #5
kdf_5 = kdf_reviews \
    .groupby('asin') \
    ['overall'] \
    .mean().alias('overall_mean_product') \
    .sort_values(ascending=False) \
    .reset_index()

kdf_5 = kdf_5 \
    .merge(kdf_1, on='asin') \
    .sort_values(by=['overall_mean_product', 'reviews_count_product'], ascending=False)

kdf_5.head(100).to_csv(path='./to_csv/kdf_5.csv', num_files=1)

print("# I 100 prodotti con le migliori recensioni #")
kdf_5.head(100)

In [None]:
# Query #6
kdf_6 = kdf_merge \
    [kdf_merge.brand != ''] \
    .dropna(subset=['brand']) \
    .groupby('brand') \
    ['overall'] \
    .mean().alias('overall_mean_brand') \
    .sort_values(ascending=False) \
    .reset_index()

kdf_6 = kdf_6 \
    .merge(kdf_3, on='brand') \
    .sort_values(by=['overall_mean_brand', 'reviews_count_brand'], ascending=False)

kdf_6.head(100).to_csv(path='./to_csv/kdf_6.csv', num_files=1)

print("# Le 100 marche con le migliori recensioni #")
kdf_6.head(100)

In [None]:
# Query #7 - #8
def get_helpful_rate(x):
    if x[1] == 0:
        return 0
    return (x[0]/x[1]) * 100

kdf_reviews['helpful_rate'] = kdf_reviews['helpful'].map(lambda x: get_helpful_rate(x))
kdf_reviews['helpful_pos'] = kdf_reviews['helpful'].map(lambda x: x[0])

kdf_reviews = kdf_reviews[kdf_reviews.helpful_pos != 0]

kdf_mean = kdf_reviews \
    [['reviewerID', 'helpful_rate']] \
    .groupby('reviewerID') \
    .mean() \
    .sort_values(by=['helpful_rate'], ascending=False) \
    .reset_index()

kdf_sum = kdf_reviews \
    [['reviewerID', 'helpful_pos']] \
    .groupby('reviewerID') \
    .sum() \
    .sort_values(by=['helpful_pos'], ascending=False) \
    .reset_index()

kdf_7_8 = kdf_mean \
    .merge(kdf_sum, on='reviewerID') \
    .sort_values(by=['helpful_rate', 'helpful_pos'], ascending=False)

In [None]:
# Query #7
kdf_7 = kdf_7_8.head(100)

kdf_7.to_csv(path='./to_csv/kdf_7.csv', num_files=1)

print("# I 100 reviewer che hanno effettuato recensioni con la maggiore utilità media #")
kdf_7

In [None]:
# Query #8
# BUG: Poor performance on reverse indexing.
kdf_8 = kdf_7_8[-1:-101: -1]

kdf_8.to_csv(path='./to_csv/kdf_8.csv', num_files=1)

print("# I 100 reviewer che hanno effettuato recensioni con la minore utilità media #")
kdf_8

In [None]:
# Query #9
kdf_9 = kdf_meta \
    .dropna(subset=['sales_rank_sports_etc']) \
    .sort_values(by=['sales_rank_sports_etc'], ascending=True) \
    [['asin', 'sales_rank_sports_etc']]

kdf_9.head(100).to_csv(path='./to_csv/kdf_9.csv', num_files=1)

print('# I 100 prodotti con il migliore ranking nelle vendite #')
kdf_9.head(100)

In [None]:
# Query #10
kdf_10 = kdf_meta \
    [kdf_meta.brand != ''] \
    .dropna(subset=['brand', 'sales_rank_sports_etc']) \
    [['brand', 'sales_rank_sports_etc']] \
    .groupby('brand') \
    .mean() \
    .sort_values(by=['sales_rank_sports_etc'], ascending=True) \
    .reset_index()

kdf_10.head(50).to_csv(path='./to_csv/kdf_10.csv', num_files=1)

print('# Le 50 marche i cui prodotti hanno il ranking medio migliore #')
kdf_10.head(50)

In [None]:
kdf_merge['helpful_rate'] = kdf_reviews['helpful_rate']

# Drop useless columns.
kdf_subset = kdf_merge.drop(['helpful', 'reviewerID', 'reviewerName', \
    'reviewTime', 'imUrl', 'related', 'salesRank', 'categories'], axis=1)

# Drop null values on critical columns.
kdf_subset = kdf_subset \
    .dropna(subset=['helpful_rate', 'price', 'title', 'description', \
        'brand', 'sales_rank_sports_etc'])

# Join with kdf_1 in order to obtain reviews_count_product column.
kdf_subset = kdf_subset.merge(kdf_1, on='asin')
# Join with kdf_5 in order to obtain overall_mean_product column.
# N.B. The review_count_product attribute is dropped because of
# duplication.
kdf_subset = kdf_subset.merge(kdf_5.drop('reviews_count_product'), on='asin')

brand_categorical = {}
index_categorical = 0
def map_brand_to_categorical(brand):
    '''
        Map brand's name to categorical.

        Args:
            brand (string): brand's name.
        Return:
            int: categorical value.
    '''
    global brand_categorical
    global index_categorical

    if brand not in brand_categorical.keys():
        brand_categorical[brand] = index_categorical
        index_categorical += 1

    return brand_categorical[brand]

# Create new columns:
# * brand_cat: brand's name in form of categorical;
# * product_title_len: product's title length;
# * product_description_len: product's description length;
# * review_summary_len: review's title length;
# * review_text_len: review's text length.
kdf_subset['brand_cat'] = kdf_subset['brand'].map(map_brand_to_categorical)
kdf_subset['product_description_len'] = kdf_subset['description'].map(len)
kdf_subset['product_title_len'] = kdf_subset['title'].map(len)
kdf_subset['review_text_len'] = kdf_subset['reviewText'].map(len)
kdf_subset['review_summary_len'] = kdf_subset['summary'].map(len)

# Drop non numeric column.
kdf_subset = kdf_subset.drop(['asin', 'brand', 'description', 'title', \
    'reviewText', 'summary'], axis=1)

kdf_subset.to_csv(path='./to_csv/kdf_subset.csv', num_files=1)

# Compute correlation matrix.
corr_mat = kdf_subset.corr().to_numpy()

# Plot correlation matrix.
plt.figure(1)
fig, ax = plt.subplots()
im = ax.imshow(corr_mat, cmap='coolwarm')

ax.set_xticks(range(len(kdf_subset.columns)))
ax.set_yticks(range(len(kdf_subset.columns)))
ax.set_xticklabels(kdf_subset.columns)
ax.set_yticklabels(kdf_subset.columns)

plt.setp(ax.get_xticklabels(), rotation=45, ha='right', \
    rotation_mode='anchor')

for i in range(len(kdf_subset.columns)):
    for j in range(len(kdf_subset.columns)):
        text = ax.text(j, i, '{:0.2f}'.format(corr_mat[i, j]), \
            ha='center', va='center', color='w', size='3')

ax.set_title("Correlation matrix")
fig.colorbar(im, orientation='vertical')
fig.tight_layout()
plt.show()