-
Notifications
You must be signed in to change notification settings - Fork 0
/
avg_bid_price.py
29 lines (22 loc) · 891 Bytes
/
avg_bid_price.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import sys
from pyspark import SparkContext
import json
def get_indv_keyword_price(line):
indv_ad = json.loads(line.strip())
ad_price = indv_ad['bidPrice']
for keyword in indv_ad['keyWords']:
keyword_price_pair = []
keyword_price_pair.append(keyword)
keyword_price_pair.append(ad_price)
yield keyword_price_pair # Note that yield has to be used here instead of return.
if __name__ == "__main__":
file = sys.argv[1] #raw Ads file
sc = SparkContext(appName="avg_bid_price") # ads_0502.txt
data = sc.textFile(file)\
.flatMap(lambda line: get_indv_keyword_price(line))\
.map(lambda w: (w[0],[1, w[1]]))\
.reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1]))\
.mapValues(lambda k: k[1]/k[0])\
.sortBy(lambda a: a[1], False)
data.saveAsTextFile("avg_bid_price_output")
sc.stop()