## Montreal Python 69
### Monday Feb 5th 2018
#### Three Methods to Subscriber's Interest - Aggregating Data
#### AGENDA:
      1- Using Python Dictionary
      
      2- Using Apache Spark - GroupBy Transformation
      
      3- Using Apache Spark - ReduceBy Transformation

## STEPS 0 - Problem Statment: 
### For Subscribers of an Online Magazine
#### Aggregate each subscriber's interst/likes in topics like: Auto, Food, Sports, Technology, Travel
#### &emsp;
### Original Data:

In [1]:
import pandas as pd
pd.set_option('display.max_rows',10)  # change preview settings

fname = './InterestData.csv'
subscrib_data = pd.read_csv(fname, delimiter =';')
subscrib_data

Unnamed: 0,ID,Interest
0,1001,Sports
1,1001,Techno
2,1002,Auto
3,1002,Food
4,1002,Sports
...,...,...
39,1018,Techno
40,1019,Food
41,1019,Techno
42,1020,Auto


### How it will be Aggregated by ID:

In [2]:
import pandas as pd
pd.set_option('display.max_rows',20)  # change preview settings

fname = './InterestAggregated.csv'
subscrib_data = pd.read_csv(fname, delimiter =';')
subscrib_data

Unnamed: 0,ID,Interest
0,1001,"Sports, Techno"
1,1002,"Auto, Food, Sports, Techno"
2,1003,Food
3,1004,"Auto, Food"
4,1005,"Auto, Food, Techno"
5,1006,"Food, Sports, Techno"
6,1007,"Sports, Techno"
7,1008,"Auto, Techno"
8,1009,Food
9,1010,"Auto, Food"


## RECIPE \#1: Using A Python Dictionary

In [3]:
import csv
from collections import defaultdict  # dictionary that assigns default values 

aggInterest = defaultdict(list)
header = None

data_file = './InterestData.csv'
with open(data_file) as csvfile:
    readCSV = csv.reader(csvfile, delimiter=';')
    header = next(readCSV)  
    for row in readCSV:
        aggInterest[row[0]].append((row[1]))

with open('./aggInterest.csv', 'w') as csvfile:         
    writeCSV = csv.writer(csvfile, delimiter=';')
    writeCSV.writerow(header)
    for idd, interest in aggInterest.items():
        writeCSV.writerow((idd,interest))

In [4]:
!cat aggInterest.csv

ID;Interest
1001;['Sports', 'Techno']
1002;['Auto', 'Food', 'Sports', 'Techno']
1003;['Food']
1004;['Auto', 'Food']
1005;['Auto', 'Food', 'Techno']
1006;['Food', 'Sports', 'Techno']
1007;['Sports', 'Techno']
1008;['Auto', 'Techno']
1009;['Food']
1010;['Auto', 'Food']
1011;['Food', 'Sport']
1012;['Auto', 'Food', 'Sports', 'Techno']
1013;['Auto']
1014;['Sports']
1015;['Auto', 'Food']
1016;['Auto', 'Food', 'Sports', 'Techno']
1017;['Sports']
1018;['Auto', 'Food', 'Techno']
1019;['Food', 'Techno']
1020;['Auto', 'Food']


In [5]:
with open('./aggInterest.csv', 'w') as csvfile:         
    writeCSV = csv.writer(csvfile, delimiter=';')
    writeCSV.writerow(header)
    for idd, interestLst in aggInterest.items():
        interest = ",".join(interestLst)
        writeCSV.writerow((idd,interest))
!cat aggInterest.csv        

ID;Interest
1001;Sports,Techno
1002;Auto,Food,Sports,Techno
1003;Food
1004;Auto,Food
1005;Auto,Food,Techno
1006;Food,Sports,Techno
1007;Sports,Techno
1008;Auto,Techno
1009;Food
1010;Auto,Food
1011;Food,Sport
1012;Auto,Food,Sports,Techno
1013;Auto
1014;Sports
1015;Auto,Food
1016;Auto,Food,Sports,Techno
1017;Sports
1018;Auto,Food,Techno
1019;Food,Techno
1020;Auto,Food


## RECIPE \#2: Using Apache Spark - GroupBy Transformation
### 1- Read Subsscriber Interest only data 
### 2- Use groupby key to merge interest 
### 3- Convert the group by value list into a long string


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.master", "local").appName("PythonCSV").getOrCreate()

file = "./InterestData.csv"

# read csv file includes header into a dataframe
dataframe = spark.read.csv(file, sep=";", inferSchema="true", header=True)

df = dataframe.select("ID","Interest")

rdd1 = df.rdd.groupByKey()

# if we do rdd1.collect() we get [(1001, <pyspark.resultiterable.ResultIterable object), 
#                                 (1002, <pyspark.resultiterable.ResultIterable object), ...]


# So to turn the results of rdd1 groupByKey() into a list of values we do a map with a list conversion
rdd2 = rdd1.map(lambda pair : (pair[0], list(pair[1]))) # convert second part -> list of strings (interest)

print(rdd2,'\n')

rdd3 = rdd2.map(lambda pair: (pair[0], (",".join(pair[1])) )) # convert list -> long string

for row in rdd3.collect():
    print(row)

spark.stop()

PythonRDD[16] at RDD at PythonRDD.scala:48 

(1001, 'Sports,Techno')
(1002, 'Auto,Food,Sports,Techno')
(1003, 'Food')
(1004, 'Auto,Food')
(1005, 'Auto,Food,Techno')
(1006, 'Food,Sports,Techno')
(1007, 'Sports,Techno')
(1008, 'Auto,Techno')
(1009, 'Food')
(1010, 'Auto,Food')
(1011, 'Food,Sport')
(1012, 'Auto,Food,Sports,Techno')
(1013, 'Auto')
(1014, 'Sports')
(1015, 'Auto,Food')
(1016, 'Auto,Food,Sports,Techno')
(1017, 'Sports')
(1018, 'Auto,Food,Techno')
(1019, 'Food,Techno')
(1020, 'Auto,Food')


## RECIPE \#3: Using Apache Spark - ReduceBy Transformation
### 1- Read Subsscriber Interest only data 
### 2- Use ReduceByKey to merge interest 
### 3- Convert the interest value list into a long string


In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.master", "local").appName("PythonCSV").getOrCreate()

file = "./InterestData.csv"

# csv file includes header
dataframe = spark.read.csv(file, sep=";", inferSchema="true", header=True)



df = dataframe.select("ID","Interest")

# convert second part -> Set('Interest') {pair} -> mapValues
# Make a union of all interest sets belonging to same key -> reduceByKey
rdd1 = df.rdd.mapValues(lambda interest: {interest})\
             .reduceByKey(lambda s1, s2: s1.union(s2))  


rdd3 = rdd1.map(lambda pair: (pair[0], (",".join(pair[1])) )) # convert second part of pair -> Set into long string

for row in rdd3.collect():
    print(row)

spark.stop()

(1001, 'Techno,Sports')
(1002, 'Auto,Techno,Sports,Food')
(1003, 'Food')
(1004, 'Auto,Food')
(1005, 'Auto,Techno,Food')
(1006, 'Food,Techno,Sports')
(1007, 'Techno,Sports')
(1008, 'Auto,Techno')
(1009, 'Food')
(1010, 'Auto,Food')
(1011, 'Sport,Food')
(1012, 'Auto,Techno,Sports,Food')
(1013, 'Auto')
(1014, 'Sports')
(1015, 'Auto,Food')
(1016, 'Auto,Techno,Sports,Food')
(1017, 'Sports')
(1018, 'Auto,Techno,Food')
(1019, 'Food,Techno')
(1020, 'Auto,Food')


## NOTES:
### Using reduceByKey is better than GroupBy because all processing happens inside Spark server where as in GroupByKey all prcessing happens in pyspark driver at client side.

### Also using Set {} in reduceByKey is better because it eleminate duplicate in (ID,Interest) pairs if there are any in InterestData.csv file
