In [1]:
# import libraries

from pyspark import SparkContext,SparkConf

In [2]:
# Create spark session

conf=SparkConf().setMaster("local").setAppName("ratings_count")
sc=SparkContext.getOrCreate(conf=conf)

In [3]:
#count the numbers for each rating(1-5)

ratings_rdd = sc.textFile("./data/u.data")

#type(ratings_rdd)  --> rdd, cannot iterate over rdd

#split each line of rdd into list and retrieve just ratings dataset which is located at 3rd index 

rating_list = ratings_rdd.map(lambda x:x.split()[2])

#ratings_rdd_list.distinct() ==> action on rdd object , returns distinct value in rdd,output is list,[1,2,3,4,5]

ratings_count = rating_list.countByValue()      # returns the count for each rating, output as dictionary form
                                                # countByValue() --> count unique values in the rdd

for keys in sorted(ratings_count.keys()):
    print(keys,ratings_count[keys])

    
#same results

#for keys,values in sorted(ratings_count.items()):
#    print(keys,values)



1 6110
2 11370
3 27145
4 34174
5 21201


In [4]:
# Find the average number of friends of different age group from fake data



#parse data to retrieve just required entities

def parsdata(line):
    field=line.split(",")
    age=int(field[2])
    nfrn=int(field[3])
    return(age,nfrn)

rdd = sc.textFile("./data/fakefriends.csv")

friends = rdd.map(parsdata)

friends_count = friends.map(lambda x : (x[0],(x[1],1)))
friends_count.collect()


#reduceByKey also groups keys together : does the function allocated task for same keys
total_friends_count = friends_count.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))

avg_friends_count = total_friends_count.mapValues(lambda x: x[0]/x[1])

avg_friends_count.sortByKey().collect()


[(18, 343.375),
 (19, 213.27272727272728),
 (20, 165.0),
 (21, 350.875),
 (22, 206.42857142857142),
 (23, 246.3),
 (24, 233.8),
 (25, 197.45454545454547),
 (26, 242.05882352941177),
 (27, 228.125),
 (28, 209.1),
 (29, 215.91666666666666),
 (30, 235.8181818181818),
 (31, 267.25),
 (32, 207.9090909090909),
 (33, 325.3333333333333),
 (34, 245.5),
 (35, 211.625),
 (36, 246.6),
 (37, 249.33333333333334),
 (38, 193.53333333333333),
 (39, 169.28571428571428),
 (40, 250.8235294117647),
 (41, 268.55555555555554),
 (42, 303.5),
 (43, 230.57142857142858),
 (44, 282.1666666666667),
 (45, 309.53846153846155),
 (46, 223.69230769230768),
 (47, 233.22222222222223),
 (48, 281.4),
 (49, 184.66666666666666),
 (50, 254.6),
 (51, 302.14285714285717),
 (52, 340.6363636363636),
 (53, 222.85714285714286),
 (54, 278.0769230769231),
 (55, 295.53846153846155),
 (56, 306.6666666666667),
 (57, 258.8333333333333),
 (58, 116.54545454545455),
 (59, 220.0),
 (60, 202.71428571428572),
 (61, 256.22222222222223),
 (62, 2

In [5]:
#find the minimum temperature of the station


def parse(line):
    lines = line.split(",")
    station_id = lines[0]
    temp_type = lines[2]
    temp = float(lines[3])*0.1*(9/5)+32
    return station_id,temp_type,temp

#temperature data rdd

temp_rdd = sc.textFile("./data/1800.csv")

#parsing to get the required fields :station_id,temp_type and temperature

temperature = temp_rdd.map(parse)

#filterring data to retrieve only TMIN type data

temp_type = temperature.filter(lambda x: "TMIN" in x[1])

#taking only station_id and temperature  as key , value pair

station_temperature = temp_type.map(lambda x : (x[0],x[2]))

#find the minimum temperature for each station id by using reducebykey
final_result = station_temperature.reduceByKey(lambda x,y : min(x,y))
final_result.collect()

[('ITE00100554', 5.359999999999999), ('EZE00100082', 7.699999999999999)]

In [6]:
#find the maximum temperature of the station

def parse(lines):
    indlines=lines.split(",")
    statnid=indlines[0]
    temptype=indlines[2]
    temp=float(indlines[3])*0.1*(9/5)+32
    temp_final= "{:.2f}".format(temp)    
    return statnid,temptype,temp_final


rdd = sc.textFile("./data/1800.csv")
req_details=rdd.map(parse)
filtered=req_details.filter(lambda x:"TMAX" in x)
filtered.stan_temp=filtered.map(lambda x: (x[0],x[2]))   #not subscrrtpable when not inside ()
reduced=filtered.stan_temp.reduceByKey(lambda x,y:max(x,y))
reduced.collect()



[('ITE00100554', '90.14'), ('EZE00100082', '90.14')]

In [7]:
#find the total amount spent by customer

customer_rdd = sc.textFile("./data/customer-orders.csv")

#parse to get req field
def parse(line):
    lines = line.split(",")
    customer_id = int(lines[0])
    money_spent = float(lines[2])
    return customer_id,money_spent

parsed_orders = customer_rdd.map(parse)
Total_money_spent = parsed_orders.reduceByKey(lambda x,y : x+y)

final = Total_money_spent.map(lambda x: (x[1],x[0]))
final.sortByKey(False).collect()

[(6375.449999999997, 68),
 (6206.199999999999, 73),
 (6193.109999999999, 39),
 (6065.389999999999, 54),
 (5995.660000000003, 71),
 (5994.59, 2),
 (5977.189999999995, 97),
 (5963.109999999999, 46),
 (5696.840000000003, 42),
 (5642.89, 59),
 (5637.62, 41),
 (5524.949999999998, 0),
 (5517.240000000001, 8),
 (5503.43, 85),
 (5497.479999999998, 61),
 (5496.050000000004, 32),
 (5437.7300000000005, 58),
 (5415.150000000001, 63),
 (5413.510000000001, 15),
 (5397.879999999998, 6),
 (5379.280000000002, 92),
 (5368.83, 43),
 (5368.249999999999, 70),
 (5337.44, 72),
 (5330.8, 34),
 (5322.649999999999, 9),
 (5298.090000000002, 55),
 (5290.409999999998, 90),
 (5288.689999999996, 64),
 (5265.750000000001, 93),
 (5259.920000000003, 24),
 (5254.659999999998, 33),
 (5253.3200000000015, 62),
 (5250.4, 26),
 (5245.059999999999, 52),
 (5206.4, 87),
 (5186.429999999999, 40),
 (5155.419999999999, 35),
 (5152.290000000002, 11),
 (5140.3499999999985, 65),
 (5123.010000000001, 69),
 (5112.709999999999, 81),
 (5