# Part 1 - Parsing Hikes

In the first part of the assignment, you need to extract the relevant attributes from the web pages scraped from hikr.org. Extend the `parse` function so that it extracts all the attributes you need to create the ranking. You may define your own helper functions and extend the `parse` function as necessary. Just keep in mind that the arguments/result types should not be changed to enable you to use the function in the second part of the assignment.

In [1]:
import scrapy
from scrapy.selector import Selector
from datetime import datetime, date, timedelta
import locale
import lib.parser_functions as pf
from typing import List
locale.setlocale(locale.LC_TIME, 'de_DE')
from dataclasses import dataclass
from lib.parser_functions import HikingDifficulty, ClimbingDifficulty, MountainBikeDifficulty, SnowshoeTourDifficulty, Waypoint, Peak, TourPartner

@dataclass
class ParsedTour:
    name: str
    id: str
    author_public_name: str
    author_internal_name: str
    author_id: str
    publishing_date_str: str
    publishing_date: datetime
    photo_count: int
    peaks: List[Peak]
    regions: dict
    tour_date: date
    waypoints: List[Waypoint]
    hiking_difficulty: HikingDifficulty
    ascent: int
    descent: int
    duration: timedelta
    climbing_difficulty: ClimbingDifficulty
    hightour_difficulty: str
    mountain_bike_difficulty: MountainBikeDifficulty
    via_ferrata_difficulty: str
    ski_difficulty: str
    snowshoe_difficulty: SnowshoeTourDifficulty
    tour_partner: List[TourPartner]
    page_views: int
# Parses a hikr.org tour and extracts all the attributes we are interested in.
# Parameters:
#   tour: HTML Content of the hikr.org tour.
# Result:
#   A dictionary containing the extracted attributes for this tour.
def parse(tour) -> ParsedTour:
    # id is the filename, text is the file content
    [id, text] = tour
    #id:  ./data/raw/200posts/post24001.html
    tour_id = id.split('/')[-1].split('.')[0].replace('post', '')
    # Parse it using scrapy
    document = Selector(text=text)
    # Do some extraction

    # TODO: Extract more attributes and add them to the result dictionary!
    publishing_date_str = document.css('div.author::text').re_first(r'\d{1,2}\. \w+ \d{4} um \d{2}:\d{2}')
    publishing_date = datetime.strptime(publishing_date_str, '%d. %B %Y um %H:%M') if publishing_date_str else None
    author_id = document.css('img[id^="anchor_author_"]::attr(id)').re_first(r'anchor_author_(\d+)')
    raw_object = {

    }

    for attribute in pf.columns:
        raw_content = document.css(f'td.fiche_rando_b:contains("{attribute}") + td.fiche_rando::text').get()
        if raw_content:
            if raw_content.strip() == '':
                raw_object[f'{attribute}_raw'] = document.css(f'td.fiche_rando_b:contains("{attribute}") + td.fiche_rando').get()
            else:
                raw_object[f'{attribute}_raw'] = raw_content.strip()
        else:
            raw_object[f'{attribute}_raw'] = None

    result = {
        'name': document.css('h1.title::text').get(),
        'id': tour_id,
        'author_public_name': document.css('div.author a.standard::text').get(),
        'author_internal_name': document.css('img[id^="anchor_author_"]::attr(onmouseover)').re_first(r'"https://www.hikr.org/","\d+","(.*?)","'),
        'author_id': author_id,
        'publishing_date_str': publishing_date_str,
        'publishing_date': publishing_date,
        'photo_count': pf.count_photos(text),
        'peaks': pf.parse_peak_map(text),
        'regions': pf.parse_region(raw_object['Region:_raw']),
        'tour_date': pf.parse_tour_date(raw_object['Tour Datum:_raw']),
        'waypoints': pf.parse_waypoints(raw_object['Wegpunkte:_raw']),
        'hiking_difficulty': pf.parse_hiking_difficulty(raw_object['Wandern Schwierigkeit:_raw']),
        'ascent': pf.parse_ascent(raw_object['Aufstieg:_raw']),
        'descent': pf.parse_descent(raw_object['Abstieg:_raw']),
        'duration': pf.parse_duration(raw_object['Zeitbedarf:_raw']),
        'climbing_difficulty': pf.parse_climbing_difficulty(raw_object['Klettern Schwierigkeit:_raw']),
        'hightour_difficulty': pf.parse_high_tour_difficulty(raw_object['Hochtouren Schwierigkeit:_raw']),
        'mountain_bike_difficulty': pf.parse_mountainbike_difficulty(raw_object['Mountainbike Schwierigkeit:_raw']),
        'via_ferrata_difficulty': pf.parse_via_ferrata_difficulty(raw_object['Klettersteig Schwierigkeit:_raw']),
        'ski_difficulty': pf.parse_ski_difficulty(raw_object['Ski Schwierigkeit:_raw']),
        'snowshoe_difficulty': pf.parse_snowshoe_tour_difficulty(raw_object['Schneeshuhtouren Schwierigkeit:_raw']),
        'tour_partner': pf.parse_tour_partners(text),
        'page_views': pf.parse_page_views(text)
    }
    return result


## Helper Functions

All the helper Function can be found in `/lib/parser_functions.py`

And the Jupyternotebook on how I extracted them: `preproccesing_notebook.ipynb`

In [2]:
# Extract the 200posts.zip file in the same folder where this jupyter notebook is located.
# Then you can run the parse function on an example tour:
with open('./data/raw/200posts/post24001.html') as f:
    content = f.read()
    r = parse([f.name, content])
    print(r)



{'name': 'monte S. Primo m.1685 (CO) ', 'id': '24001', 'author_public_name': 'Alberto', 'author_internal_name': 'alberto', 'author_id': '2823', 'publishing_date_str': '11. Juni 2010 um 09:25', 'publishing_date': datetime.datetime(2010, 6, 11, 9, 25), 'photo_count': 2, 'peaks': [], 'regions': {'region_0_content': 'Welt', 'country': 'Italien', 'region_2_content': 'Lombardei'}, 'tour_date': datetime.date(2010, 6, 5), 'waypoints': None, 'hiking_difficulty': HikingDifficulty(hiking_difficulty='T2', hiking_difficulty_description='Bergwandern'), 'ascent': 600, 'descent': 600, 'duration': datetime.timedelta(days=1), 'climbing_difficulty': None, 'hightour_difficulty': None, 'mountain_bike_difficulty': None, 'via_ferrata_difficulty': None, 'ski_difficulty': None, 'snowshoe_difficulty': None, 'tour_partner': [{'name': 'Alberto', 'user_id': 'alberto'}], 'page_views': 109}


In [3]:
import os
files_dir = './data/raw/200posts'
for filename in os.listdir(files_dir):
    if filename.endswith('.html'):
        print(filename)
        file_path = os.path.join(files_dir, filename)
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
            r = parse([f.name, content])

post24199.html
post24064.html
post24208.html
post24025.html
post24160.html
post24249.html
post24320.html
post24273.html
post24048.html
post24224.html
post24195.html
post24204.html
post24212.html
post24091.html
post24183.html
post24117.html
post24140.html
post24005.html
post24156.html
post24013.html
post24228.html
post24101.html
post24044.html
post24157.html
post24012.html
post152050.html
post24141.html
post24004.html
post24268.html
post24053.html
post24182.html
post24213.html
post24086.html
post24194.html
post24317.html
post24225.html
post24049.html
post24321.html
post24008.html
post24233.html
post24024.html
post24161.html
post24136.html
post24209.html
post152027.html
post24065.html
post24120.html
post24032.html
post24198.html
post24177.html
post24193.html
post24202.html
post24081.html
post24214.html
post24078.html
post24243.html
post24185.html
post152016.html
post24111.html
post24054.html
post24146.html
post24003.html
post24296.html
post24150.html
post24015.html
post24042.html
post241

# Part 2 - Parallelization & Aggregation (Spark)

NOTE: It is highly recommended to wait with this part until after the Spark lecture!

NOTE: This part only works on databricks!

To add a library such as scrapy, perform the following steps:

- Go to the "Clusters" panel on the left
- Select your cluster
- Go to the "Libraries" tab
- Click "Install New"
- Choose "PyPI" as library source
- Type the name of the library, "scrapy", into the package field
- Click "Install"
- Wait until the installation has finished

You can now use the newly installed library in your code.

Note: In the community edition, databricks terminates your cluster after 2 hours of inactivity. If you re-create the cluster, you will have to perform these steps again.

In [13]:
from pyspark.sql import SparkSession
from dotenv import load_dotenv
import os
load_dotenv() 



    #.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    #    .config("spark.sql.files.maxPartitionBytes", 1024 * 1024 * 4)
spark = SparkSession.builder \
    .appName("S3Example") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "15") \
    .config("spark.cores.max", "15") \
    .config("spark.sql.shuffle.partitions", 100) \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.default.parallelism", 100) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.files.openCostInBytes", "1073741824" ) \
    .config("spark.driver.extraJavaOptions", "-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler") \
    .config("spark.executor.extraJavaOptions", "-XX:+UnlockExperimentalVMOptions -XX:+UseJVMCICompiler") \
    .getOrCreate()

spark.conf.set("spark.sql.files.maxPartitionBytes", 1024 * 1024 * 4)
spark.conf.set("spark.sql.files.openCostInBytes", "1073741824")  
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", os.getenv('S3_KEY_ID'))
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", os.getenv('S3_SECRET_KEY'))
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv('S3_KEY_ID'))
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv('S3_SECRET_KEY'))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")


# Contains the whole hikr dataset.
# The full dataset contains 113710 tours and has a size of around 6 GB.
# There are 46854 posts starting with "post1*". Use this dataset for your final results if possible. Execution is likely to take around 30~45 minutes.
# There are 8176 posts starting with "post10*", which is a nicer size for smaller experiments.
# If you want to further shrink the dataset size for testing, you can add another zero to the pattern (post100*.html).
#tours = sc.wholeTextFiles("s3a://dawr-hikr/post10000*.html")
#tours = sc.wholeTextFiles("s3a://dawr-hikr/post10*.html")

In [None]:
spark.stop()

In [5]:
# Print spark ui link
print('Spark UI running on http://YOURIPADDRESS:' + spark.sparkContext.uiWebUrl.split(':')[2])

Spark UI running on http://YOURIPADDRESS:4040


In [14]:
from pyspark.sql.functions import input_file_name, col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType, ArrayType


tours_df = spark.read.text("s3a://dawr-hikr/post10*.html", wholetext=True).withColumn("file_path", input_file_name())



In [6]:
parse_udf = udf(lambda content, file_path: parse([file_path, content]), returnType=StructType([
    StructField("name", StringType(), nullable=True),
    StructField("id", StringType()),
    StructField("author_public_name", StringType()),
    StructField("author_internal_name", StringType()),
    StructField("author_id", StringType()),
    StructField("publishing_date_str", StringType()),
    StructField("publishing_date", TimestampType()),
    StructField("photo_count", IntegerType()),
    StructField("peaks", ArrayType(StructType([
        StructField("latitude", FloatType()),
        StructField("longitude", FloatType()),
        StructField("name", StringType()),
        StructField("height", IntegerType()),
        StructField("id", IntegerType())
    ]))),
    StructField("regions", StructType([
        StructField("region_0_content", StringType()),
        StructField("country", StringType()),
        StructField("region_2_content", StringType()),
        StructField("region_3_content", StringType()),
        StructField("region_4_content", StringType())
    ])),
    StructField("tour_date", DateType()),
    StructField("waypoints", ArrayType(StructType([
        StructField("image", StringType()),
        StructField("name_raw", StringType()),
        StructField("type", StringType()),
        StructField("waypoint_url", StringType()),
        StructField("height", IntegerType()),
        StructField("name", StringType()),
        StructField("peak_id", StringType())
    ]))),
    StructField("hiking_difficulty", StructType([
        StructField("hiking_difficulty", StringType()),
        StructField("hiking_difficulty_description", StringType())
    ])),
    StructField("ascent", IntegerType()),
    StructField("descent", IntegerType()),
    StructField("duration", StringType()),  # Spark SQL doesn't have a built-in type for timedelta
    StructField("climbing_difficulty", StructType([
        StructField("climbing_difficulty", StringType()),
        StructField("climbing_difficulty_description", StringType())
    ])),
    StructField("hightour_difficulty", StringType()),
    StructField("mountain_bike_difficulty", StructType([
        StructField("mountainbike_difficulty", StringType()),
        StructField("mountainbike_difficulty_description", StringType())
    ])),
    StructField("via_ferrata_difficulty", StringType()),
    StructField("ski_difficulty", StringType()),
    StructField("snowshoe_difficulty", StructType([
        StructField("snowshoe_tour_difficulty", StringType()),
        StructField("snowshoe_tour_difficulty_description", StringType())
    ])),
        StructField("tour_partner", ArrayType(StructType([
        StructField("name", StringType()),
        StructField("user_id", StringType()),
    ]))),
    StructField("page_views", IntegerType()),
]))


In [7]:
parsedTours_df = tours_df.select(parse_udf(col("value"), col("file_path")).alias("parsed_data")).select("parsed_data.*")

parsedTours_df.cache()

# Show the parsed DataFrame
parsedTours_df.show()

                                                                                

+--------------------+------+--------------------+--------------------+---------+--------------------+-------------------+-----------+--------------------+--------------------+----------+--------------------+--------------------+------+-------+--------------------+-------------------+-------------------+------------------------+----------------------+--------------+--------------------+--------------------+----------+
|                name|    id|  author_public_name|author_internal_name|author_id| publishing_date_str|    publishing_date|photo_count|               peaks|             regions| tour_date|           waypoints|   hiking_difficulty|ascent|descent|            duration|climbing_difficulty|hightour_difficulty|mountain_bike_difficulty|via_ferrata_difficulty|ski_difficulty| snowshoe_difficulty|        tour_partner|page_views|
+--------------------+------+--------------------+--------------------+---------+--------------------+-------------------+-----------+--------------------+-

In [8]:
parsedTours_df.count()

                                                                                

8176

In [9]:
parsedTours_df.createOrReplaceTempView("parsedTours")



spark.sql("Select * from parsedTours limit 10").show()
# Create Ranking for the Peaks with the most tours
spark.sql("""
    SELECT 
        exploded_peaks.col.name AS peak_name,
        exploded_peaks.col.id AS peak_id, 
        exploded_peaks.col.height AS peak_height ,   
        COUNT(*) AS tour_count
    FROM parsedTours
    LATERAL VIEW explode(peaks) exploded_peaks
    LATERAL VIEW explode(tour_partner) exploded_partners
    GROUP BY 
          exploded_peaks.col
    ORDER BY tour_count DESC, peak_height DESC
    """).show()


+--------------------+------+--------------------+--------------------+---------+--------------------+-------------------+-----------+--------------------+--------------------+----------+--------------------+--------------------+------+-------+--------------------+-------------------+-------------------+------------------------+----------------------+--------------+--------------------+--------------------+----------+
|                name|    id|  author_public_name|author_internal_name|author_id| publishing_date_str|    publishing_date|photo_count|               peaks|             regions| tour_date|           waypoints|   hiking_difficulty|ascent|descent|            duration|climbing_difficulty|hightour_difficulty|mountain_bike_difficulty|via_ferrata_difficulty|ski_difficulty| snowshoe_difficulty|        tour_partner|page_views|
+--------------------+------+--------------------+--------------------+---------+--------------------+-------------------+-----------+--------------------+-



+--------------------+-------+-----------+----------+
|           peak_name|peak_id|peak_height|tour_count|
+--------------------+-------+-----------+----------+
|Rifugio Antoniett...|  19337|       1365|        50|
|           Alpe Cova|  32366|       1311|        48|
|         Alpe Devero|  10282|       1631|        46|
|     Alpe del Vicerè|  23035|        900|        46|
|          Wasserauen|   6854|        868|        46|
|  Bocchetta di Lemna|  24364|       1167|        45|
|            Wildhaus|   6853|       1090|        43|
|Bocchetta di Palanzo|  24365|       1210|        42|
|Cappella Sacro Cuore|  28975|        828|        42|
|    Rifugio Brioschi|  21151|       2410|        40|
| Bivacco Riva-Girani|  27701|       1862|        38|
|      Rifugio Azzoni|   6616|       1860|        37|
|        San Bernardo|  35186|       1620|        35|
| Rifugio Alp de Volt|  36471|       1340|        35|
|       Passo Forcora|  20149|       1190|        35|
|     Monte Bolettone|  1709

                                                                                

In [11]:
from pyspark.sql.functions import col, avg, min, max, count

# Assuming you have a DataFrame named 'parsedTours' with the parsed tour data

aggregated_data = parsedTours_df.groupBy("regions.country") \
    .agg(
        count("*").alias("total_tours"),
        avg("ascent").alias("avg_ascent"),
        min("ascent").alias("min_ascent"),
        max("ascent").alias("max_ascent"),
        avg("photo_count").alias("avg_photo_count")
    ) \
    .orderBy(col("total_tours").desc())

aggregated_data.show(truncate=False)



+---------------+-----------+------------------+----------+----------+------------------+
|country        |total_tours|avg_ascent        |min_ascent|max_ascent|avg_photo_count   |
+---------------+-----------+------------------+----------+----------+------------------+
|Schweiz        |3882       |1042.3350815850815|5         |7630      |24.113858835651726|
|Italien        |1837       |1109.2795484727756|30        |9900      |27.470332063146433|
|Österreich     |907        |1018.928927680798 |5         |4980      |21.177508269018745|
|Deutschland    |761        |812.5855728429985 |25        |4053      |23.692509855453352|
|Spanien        |137        |601.9017857142857 |20        |2000      |26.255474452554743|
|Frankreich     |131        |942.0943396226415 |45        |5700      |17.27480916030534 |
|United States  |64         |413.14814814814815|30        |1550      |18.53125          |
|Norwegen       |52         |1186.3863636363637|200       |2200      |24.442307692307693|
|Island   

                                                                                