# Introduction to Big Data Technologies - Ex3
## PySpark MapReduce
In this exercise you will answer data analysis questions using the PySpark map-reduce API the RDD object exposes.

**Tip 1:** Think how you handle missing or empty data<br>
**Tip 2:** Some datasets may be very large and take a long time to run commands on. Think how you can limit the size of the dataset while you're experimenting with PySpark commands

## 3.0. Loading the data

In [0]:
### loading a CSV file as a PySpark RDD object
gowalla_nyc_rdd = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/gowalla_NYC_with_context.csv').rdd

# Notice the type - pyspark.rdd.RDD
type(gowalla_nyc_rdd)

Out[1]: pyspark.rdd.RDD

Let's take a quick look at the data

In [0]:
gowalla_nyc_rdd.count()

Out[2]: 497180

In [0]:
gowalla_small_rdd = gowalla_nyc_rdd.sample(False, 0.01)
gowalla_small_rdd.count()

Out[3]: 5009

In [0]:
gowalla_nyc_rdd.take(5)

Out[4]: [Row(userid=5144, placeid=592233, datetime=datetime.datetime(2010, 7, 9, 1, 59, 28), lng=-73.9862982274, lat=40.7478105501, spot_categories="[{'url': '/categories/199', 'name': 'Hotel'}]", gencat='Travel', partOfDay='Night', isWeekend=False, Month=7, Season='Summer', onlyDate=datetime.date(2010, 7, 9), -3num=None, -3dist=None, -3_Community='none', -3_Entertainment='none', -3_Food='none', -3_Nightlife='none', -3_Outdoors='none', -3_Shopping='none', -3_Travel='none', minus3num=None, minus3dist=None),
 Row(userid=5144, placeid=935170, datetime=datetime.datetime(2010, 7, 8, 21, 59, 29), lng=-73.98103916, lat=40.78229314, spot_categories="[{'url': '/categories/47', 'name': 'Modern Hotel'}]", gencat='Travel', partOfDay='Evening', isWeekend=False, Month=7, Season='Summer', onlyDate=datetime.date(2010, 7, 8), -3num=None, -3dist=None, -3_Community='none', -3_Entertainment='none', -3_Food='none', -3_Nightlife='none', -3_Outdoors='none', -3_Shopping='none', -3_Travel='none', minus3num=Non

## 3.1. How many users have visited more than 10 places?

**Note:** the expected answer is a single number representing the total number of unique users who have visited more than 10 different places

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!



<div style="color:blue">(12 points)</div>

In [0]:
gowalla_nyc_rdd.map(lambda r: (r.userid, {r.placeid}))\
    .reduceByKey(lambda a, b: a.union(b))\
    .filter(lambda x: len(x[1]) > 10)\
    .count()

Out[13]: 6150

## 3.2. Display the top place (latitude and longitude) with the highest number of visitors during weekends.

**Note:** the expected answer is a tuple with the "ing" and the "lat"

**Hint:** The "isWeekend" column is already defined

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!
 

<div style="color:blue">(14 points)</div>

In [0]:
gowalla_nyc_rdd.filter(lambda r: r.isWeekend)\
    .map(lambda r: ((r.lng, r.lat), 1))\
    .reduceByKey(lambda a, b: a + b)\
    .max(key=lambda x: x[1])[0]

Out[6]: (-73.9862251282, 40.7568799674)

## 3.3. What is the average number of visitors in July across all years in the dataset?

**Note**: the expected answer is a single number representing the (total number of visitors) / (total number of places)

**Note 2**: a user can be count one time per place

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!
<div style="color:blue">(16 points)</div>

In [0]:
gowalla_nyc_rdd.filter(lambda r: r.Month==7)\
    .map(lambda r: (r.placeid, {r.userid}))\
    .reduceByKey(lambda a,b: a.union(b))\
    .map(lambda r: (None, (len(r[1]), 1)))\
    .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
    .map(lambda x: x[1][0] / x[1][1])\
    .collect()[0]

Out[7]: 2.5038991560730692

## 3.4. What is the place id visited by the largest number of users? 
## What is its category? 
## How many unique users visited it?  

**Note:** the expected answer includes placeid, its category and number of unique users visited it

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!

<div style="color:blue">(18 points)</div>

In [0]:
gowalla_nyc_rdd.map(lambda r: (r.placeid, (r.gencat, {r.userid})))\
    .reduceByKey(lambda a, b: (a[0], a[1].union(b[1])))\
    .map(lambda r: (r[0], r[1][0], len(r[1][1])))\
    .max(key=lambda x: x[2])

# gowalla_nyc_rdd.map(lambda r: (r.placeid, (r.gencat, 1, {r.userid})))\
#     .reduceByKey(lambda a, b: (a[0], a[1] + b[1], a[2].union(b[2])))\
#     .map(lambda r: (r[0], r[1][0], r[1][1], len(r[1][2])))\
#     .max(key=lambda x: x[2])


Out[12]: (11844, 'Entertainment', 3458)

## 3.5. Was there a user who visited more than 100 different places during the summer of 2011? 

**Note:** Your job has to return a `True` or `False` value.

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!
<div style="color:blue">(18 points)</div>

In [0]:
gowalla_nyc_rdd.filter(lambda r: r.Season == "Summer" and r.datetime.year == 2011)\
  .map(lambda r: (r.userid, {r.placeid}))\
  .reduceByKey(lambda a, b: a.union(b))\
  .map(lambda r: (r[0], len(r[1])))\
  .filter(lambda r: r[1] > 100)\
  .count() > 0

Out[9]: True

## 3.6. Which part of day can produce the greatest bias for an ML model, when divided by the weekend indicator?

**Hint:** a higher non-weekend:weekend ratio might point to a bias for an ML model

**Note:** the expected answer is a tuple with the part of day and the ratio 

You **must** perform this calculation using a single string of RDD map-reduce transformations ending with a single action, and no other commands - both PySpark commands specifically, and Python commands generally!
<div style="color:blue">(22 points)</div>

In [0]:
gowalla_nyc_rdd.map(lambda r: (r.partOfDay, (0, 1)) if r.isWeekend else (r.partOfDay, (1, 0)))\
   .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))\
   .map(lambda r: (r[0], r[1][0] / r[1][1]))\
   .max(key=lambda x: x[1])

Out[10]: ('Morning', 2.6428978893325725)

## Good Luck!