# TDT4305 Project 1 - RDD Tasks

In [1]:
from datetime import datetime
from base64 import b64decode

## Loading data files

In [2]:
reviews = sc.textFile('../data/yelp_top_reviewers_with_reviews.csv') \
    .zipWithIndex() \
    .filter(lambda x: x[1] > 0) \
    .map(lambda x: x[0].replace('"', '').split('\t'))
# "review_id","user_id","business_id","review_text","review_date"

In [3]:
businesses = sc.textFile('../data/yelp_businesses.csv') \
    .zipWithIndex() \
    .filter(lambda x: x[1] > 0) \
    .map(lambda x: x[0].replace('"', '').split('\t'))
# "business_id","name","address","city","state","postal_code",
# "latitude","longitude","stars","review_count","categories"

In [4]:
friendships = sc.textFile('../data/yelp_top_users_friendship_graph.csv') \
    .zipWithIndex() \
    .filter(lambda x: x[1] > 0) \
    .map(lambda x: x[0].replace('"', '').split(','))
# "src_user_id","dst_user_id"

## Task 1

### Counting number of rows

In [26]:
review_count = reviews.count()
business_count = businesses.count()
friendship_count = friendships.count()

In [27]:
review_count

883737

In [28]:
business_count

192609

In [29]:
friendship_count

1938472

In [None]:
with open('solutions/task1.csv', 'w') as f:
    f.write(f'review_count, business_count, friendship_count\n{review_count}, {business_count}, {friendship_count}')

## Task 2

### a) Finding number of distinct users

In [None]:
f_users = friendships.map(lambda row: row[0])
r_users = reviews.map(lambda row: row[1])

all_users = sc.union([f_users, r_users])
distinct_users = all_users.distinct().count()

In [None]:
with open('solutions/task2a.csv', 'w') as f:
    f.write(str(distinct_users))

### b) Average numbers of characters in a review

In [6]:
avg_characters_in_review = reviews.map(lambda row: len(b64decode(row[3]).decode('utf8'))).mean()

In [7]:
avg_characters_in_review

856.8332433744425

In [8]:
with open('solutions/task2b.csv', 'w') as f:
    f.write(str(avg_characters_in_review))

### c) Top 10 businesses by amount of reviews

In [None]:
top_businesses_by_review = reviews.map(lambda row: (row[2], 1)).reduceByKey(lambda x, y: x + y) \
    .sortBy(lambda row: row[1], ascending=False) \
    .map(lambda row: row[0])

In [None]:
top_businesses_by_review.take(10)

In [None]:
top_businesses_by_review.zipWithIndex().filter(lambda row: row[1] < 10).map(lambda row: row[0]) \
    .coalesce(1).saveAsTextFile('solutions/raw/task2c')

### d) Reviews per year

In [None]:
reviews_per_year = reviews.map(lambda row: (datetime.fromtimestamp(float(row[4])).year, 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortBy(lambda x: x[0])

In [None]:
reviews_per_year.collect()

In [None]:
reviews_per_year.map(lambda row: ','.join(str(d) for d in row)).coalesce(1).saveAsTextFile('solutions/raw/task2d')

### e) First and last review

In [None]:
dates = reviews.map(lambda row: float(row[4]))

first_review = datetime.fromtimestamp(dates.sortBy(lambda x: x).first())
last_review = datetime.fromtimestamp(dates.sortBy(lambda x: -x).first())

In [None]:
first_review.strftime("%d.%m.%Y, %H:%M:%S")

In [None]:
last_review.strftime("%d.%m.%Y, %H:%M:%S")

In [None]:
with open('solutions/task2e.csv', 'w') as f:
    f.write(f'first_review, last_review\n{first_review}, {last_review}')

### f) Pearson Correlation Coefficient

In [9]:
count_and_length = reviews.map(lambda row: (row[1], len(b64decode(row[3]).decode('utf8')))) \
    .aggregateByKey((0, 0), lambda x, y: (x[0] + 1, x[1] + y), lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .map(lambda row: (row[0], row[1][0], row[1][1] / row[1][0]))

In [10]:
count_and_length.first()

('-InhDRRVG7wrwsgAUvN4Qw', 555, 1118.9351351351352)

In [11]:
# (count, review_count_sum, review_length_sum)
agg = count_and_length.map(lambda row: (row[1], row[2])) \
    .aggregate((0, 0, 0),
                lambda x, y: (x[0] + 1, x[1] + y[0], x[2] + y[1]),
                lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])
              )
x_avg = agg[1] / agg[0]
y_avg = agg[2] / agg[0]

In [12]:
pcc_agg = count_and_length.map(lambda row: (row[1], row[2])) \
    .map(lambda row: (
                         (row[0] - x_avg) * (row[1] - y_avg),
                         (row[0] - x_avg) ** 2,
                         (row[1] - y_avg) ** 2
                     )) \
    .reduce(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))

pcc = pcc_agg[0] / ((pcc_agg[1] ** 0.5) * (pcc_agg[2] ** 0.5))

In [13]:
pcc

0.12587077666409704

In [14]:
with open('solutions/task2f.csv', 'w') as f:
    f.write(str(pcc))

## Task 3

### a) Average rating by city

In [None]:
avg_rating_by_city = businesses.map(lambda row: (row[3], (float(row[8]), 1))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .map(lambda row: (row[0], row[1][0] / row[1][1])) \
    .sortBy(lambda x: x[0])

In [None]:
avg_rating_by_city.collect()

In [None]:
avg_rating_by_city.map(lambda row: ','.join(str(r) for r in row)).coalesce(1) \
    .saveAsTextFile('solutions/raw/task3a')

### b) Top 10 most frequent categories

In [None]:
top_categories = businesses.flatMap(lambda row: row[10].split(',')) \
    .map(lambda row: (row.strip(), 1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortBy(lambda row: row[1], ascending=False)

In [None]:
top_categories.take(10)

In [None]:
top_categories.zipWithIndex().filter(lambda row: row[1] < 10) \
    .map(lambda row: row[0][0]).coalesce(1).saveAsTextFile('solutions/raw/task3b')

### c) Geographical centroid

In [None]:
pc_centroids = businesses.map(lambda row: (row[5], (1, float(row[6]), float(row[7])))) \
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])) \
    .map(lambda row: (row[0], (row[1][1] / row[1][0], row[1][2] / row[1][0])))

In [None]:
pc_centroids.take(5)

In [None]:
pc_centroids.map(lambda row: f'{row[0]}, {row[1][0]}, {row[1][1]}').coalesce(1) \
    .saveAsTextFile('solutions/raw/task3c')

## Task 4

### a) Top in and out degrees

In [None]:
in_out_degrees = friendships.flatMap(lambda row: [(row[0], (0, 1)), (row[1], (1, 0))]) \
        .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [None]:
top_in = in_out_degrees.map(lambda row: (row[0], row[1][0])) \
    .sortBy(lambda row: row[1], ascending=False)

In [None]:
top_in.take(10)

In [None]:
top_out = in_out_degrees.map(lambda row: (row[0], row[1][1])) \
    .sortBy(lambda row: row[1], ascending=False)

In [None]:
top_out.take(10)

In [None]:
top_in.zipWithIndex().filter(lambda row: row[1] < 10).map(lambda row: row[0][0]) \
    .coalesce(1).saveAsTextFile('solutions/raw/task4a-in')

top_out.zipWithIndex().filter(lambda row: row[1] < 10).map(lambda row: row[0][0]) \
    .coalesce(1).saveAsTextFile('solutions/raw/task4a-out')

### b) Mean and median in and out degrees

In [None]:
mean_in = in_out_degrees.map(lambda row: row[1][0]).mean()
mean_out = in_out_degrees.map(lambda row: row[1][1]).mean()

In [None]:
mean_in

In [None]:
mean_out

In [None]:
count = in_out_degrees.count()

top_in = in_out_degrees.map(lambda row: row[1][0]) \
    .sortBy(lambda row: row, ascending=False) \
    .zipWithIndex() \
    .map(lambda row: (row[1], row[0]))

top_out = in_out_degrees.map(lambda row: row[1][1]) \
    .sortBy(lambda row: row, ascending=False) \
    .zipWithIndex() \
    .map(lambda row: (row[1], row[0]))

if count % 2 == 0:
    l = count // 2
    r = l + 1
    median_in = (top_in.lookup(l)[0] + top_in.lookup(r)[0]) / 2
    median_out = (top_out.lookup(l)[0] + top_out.lookup(r)[0]) / 2
else:
    mid = count // 2
    median_in = top_in.lookup(mid)[0]
    median_out = top_out.lookup(mid)[0]

In [None]:
median_in

In [None]:
median_out

In [None]:
with open('solutions/task4b-in.csv', 'w') as f:
    f.write(f'mean, median\n{mean_in},{median_in}')
    
with open('solutions/task4b-out.csv', 'w') as f:
    f.write(f'mean, median\n{mean_out},{median_out}')