/
queryplans_rdd.py
22 lines (17 loc) · 1.26 KB
/
queryplans_rdd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
from sys import argv
from pyspark import RDD
from pyspark.sql import SparkSession, DataFrame
from tutorials.module1.python.utilities.helper_python import create_session, extract_raw_records, parse_raw_warc, parse_raw_wet
if __name__ == "__main__":
input_loc_warc = (argv[1]) if len(argv) > 2 else '/Users/me/IdeaProjects/bdrecipes/resources/CC-MAIN-20191013195541-20191013222541-00000.warc' # ToDo: Modify path
input_loc_wet = (argv[2]) if len(argv) > 2 else '/Users/me/IdeaProjects/bdrecipes/resources/CC-MAIN-20191013195541-20191013222541-00000.warc.wet' # ToDo: Modify path
spark: SparkSession = create_session(3, 'Hermeneutics Exercise RDD')
warc_records: RDD = extract_raw_records(input_loc_warc, spark).flatMap(lambda record: parse_raw_warc(record))
wet_records: RDD = extract_raw_records(input_loc_wet, spark).flatMap(lambda record: parse_raw_wet(record))
pair_warc: RDD = warc_records.map(lambda warc: (warc.target_uri, warc.language))
pair_wet: RDD = wet_records.map(lambda wet: (wet.target_uri, wet.plain_text))
joined = pair_warc.join(pair_wet)
spanish_records = joined.filter(lambda triple: triple[1][0] == 'es')
print('@@ Result: ' + str(spanish_records.count())) # 133
time.sleep(10 * 60) # For exploring WebUI