In [None]:
# the sparkConf is empty, to take advantges of pre-configures stuff on EMR
# 5 nodes, 1 master and 4 slaves
# all the configuration is left empty, to take advantge of the pre config in EMR amazon
conf = SparkConf()
sc = SparkContext(conf=conf)

# to check the correctness of the program, the dataset testing is imported

# importing data from amzon data storage S3 
line_1 = sc.textFile('s3://amazonfriends/big_data_1.csv')
line_2 = sc.textFile('s3://amazonfriends/big_data_2.csv')
line_3 = sc.textFile('s3://amazonfriends/big_data_3.csv')
line_4 = sc.textFile('s3://amazonfriends/big_data_4.csv')
line_5 = sc.textFile('s3://amazonfriends/big_data_5.csv')

# the whole dataset

df = sc.union([line_1,line_2,line_3,line_4,line_5])
# second option to calculate time
start_time = sc.startTime


In [None]:
# parsing each line into useful fields in rdd variable
# each line have (user,age) values which are assigned as key with a value of 1, one represented the connect friend with the use
data = df.map(lambda line: line.split(',')).map(lambda column: ((column[1], float(column[2])), 1))



# get age-period for each age
# the age range is converted to age group, to be used later
def get_age_period(age):
    if age in range(16, 20):
        return 'teens_youthAdult'
    elif age in range(20, 40):
        return 'Adult'
    elif age in range(40, 60):
        return 'MiddleAge'
    elif age in range(60, 72):
        return 'old'

# having rdd with keys (user, age) and values as 1 (friend) will make it easier to iterate over data using 
# reduceByKey - added the number of friends of a user
user_num_friends = data.reduceByKey(lambda x, y: x + y)

age_group_friends = user_num_friends.map(lambda age: (get_age_period(age[0][1]), age[1]))



# after computing number of friends for each user-key (user,age) and mapping it to age-period ->(user,age) is replaced with age group based on age
# using aggregation is (more or less the same as reduce by key) will help calculating average number of friends
####################################################################################################################
# Accumulators are variables that are only “added” to through an associative and commutative operation and can 
#therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. 
#Spark natively supports accumulators of numeric types, and programmers can add support for new types.
#-reference (https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#accumulators)

# at this point, to the number of friends, 1 is mapped, it is similar to mapValue to 1
##################################################################################################################

age_group_friends = age_group_friends.mapValues(lambda x: (x,1))
print(age_group_friends.take(30))

age_group_total_friends = age_group_friends.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
print(age_group_total_friends.take(30))

averageByAge = age_group_total_friends.mapValues(lambda x : (x[0]/x[1]))
result = averageByAge.collect()
for key,value in result:
    print(key,value)
