-
Notifications
You must be signed in to change notification settings - Fork 0
/
exercise.py
39 lines (28 loc) · 1.32 KB
/
exercise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyparsing import col
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType
def init_spark_session():
return SparkSession.builder.appName("test").getOrCreate()
def test(session, f, prefix):
colName = f.name
if prefix is not None:
colName = prefix + "." + f.name
if f.dataType == StructType:
return flattenSchema(session, f.dataType, colName)
else:
return [col(colName)]
def flattenSchema(session, schema: StructType, prefix: str):
session.sparkContext.parallelize(schema.fields).flatMap(lambda f: test(f, prefix))
if __name__ == '__main__':
session = init_spark_session()
prize = session.read.json("ds/prize.json").select(f.explode("prizes"))
laureate = session.read.json("ds/laureate.json").select(f.explode("laureates"))
prize.printSchema()
prize.select("col").show()
prizeid_year = prize.select(f.explode("col.laureates.id").alias("person_id"), "col.year") #.where(f.isnull("col.laureates.id")).show()
prizeid_year.printSchema()
join =prizeid_year.join(laureate, prizeid_year["person_id"] == laureate["col.id"])
join.printSchema()
join.select("year", "col.bornCountry").orderBy(f.desc("year")).groupBy("bornCountry").count().show()
#.groupBy("col.bornCountry").count().show()