/
mongodb.py
66 lines (49 loc) · 2.29 KB
/
mongodb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from pyspark.sql.types import StringType, StructType, DoubleType, StructField
from argparse import ArgumentParser
import pandas as pd
from pymongo import MongoClient
parser = ArgumentParser()
parser.add_argument('-ip', '--mongodb_input_uri', required=True)
parser.add_argument('-uri', '--mongodb_output_uri', required=True)
args = parser.parse_args()
sc = SparkContext()
sqlContext = SQLContext(sc)
schema = StructType([
StructField('hotel', StringType()),
StructField('avg_ratings', DoubleType())
])
spark = SparkSession.builder.master('local')\
.config('spark.mongodb.input.partitioner', 'MongoShardedPartitioner')\
.config('spark.mongodb.input.uri', args.mongodb_input_uri)\
.config('spark.mongodb.output.uri', args.mongodb_output_uri)
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
.getOrCreate()
def get_category_udf(hotel):
client = MongoClient(args.mongodb_input_uri)
row = client['hotel'].category.find_one({'hotel':hotel})
return row['categories'][0]
def get_category_udf2(hotel):
df = spark.read.format('com.mongodb.spark.sql.DefaultSource')\
.option('pipeline', {'$match':{'hotel': f'{hotel}'}})\
.load()
return df.select('address').first()[0][0]
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def group_process(data):
return_dict = {}
return_dict['avg_ratings'] = data['ratings'].sum() // data.count()
return_dict = pd.DataFrame([return_dict])
return_dict = pd.concat([data[['hotel']], return_dict], axis=1, ignore_index = True)
return return_dict
def __name__ == '__main__':
df = spark.read.format('com.mongodb.spark.sql.DefaultSource')\
.option('database', 'booking')\
.option('collection', 'hotel')\
.load()
invert_category_udf = udf(get_category_udf, StringType())
df = df.withColumn('address', invert_category_udf('hotel'))
df = df.drop('key').groupby(['hotel']).apply(group_process)
df = df.orderBy(col('avg_ratings').desc()).limit(500)
df.write.format('com.mongodb.spark.sql.DefaultSource').mode('overwrite').save()