In [0]:
# run this command on databricks first, not necessary locally
!pip install scrapy

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-d0ddf361-27ba-4074-a796-d04c4186057d/bin/python -m pip install --upgrade pip' command.[0m


# Part 1 - Parsing Hikes

In the first part of the assignment, you need to extract the relevant attributes from the web pages scraped from hikr.org. Extend the `parse` function so that it extracts all the attributes you need to create the ranking. You may define your own helper functions and extend the `parse` function as necessary. Just keep in mind that the arguments/result types should not be changed to enable you to use the function in the second part of the assignment.

## Chosen Features
The follwing features are extracted from the hikr.org tour pages:

### 1. Region (`region`)
A string representing the tour's geographical region, as a breadcrumb path (e.g., "World » Italy » Lombardy") <br>
**Cleaning:** Individual parts are whitespace-trimmed and joined by " » ". It will be `None` if not found or if all parts are empty.

### 2. Tour Date (`tour_date`)
The date of the tour, formatted as a "dd.mm.yyyy" string (e.g., "05.06.2010"). <br>
**Cleaning:** The day and month are zero-padded. It will be `None` if the original date string is not found, is in an unexpected format, or contains an unrecognized month name.

### 3. Descent in Meters (`descent_meters`)
The total descent of the tour in meters. <br>
**Cleaning:** The "m" unit removed and all letters converted to lowercase (e.g., "600"). Leading/trailing whitespace removed. It will be `None` if not found.

### 4. Ascent in Meters (`ascent_meters`)
The total ascent of the tour in meters. <br>
**Cleaning:** The "m" unit removed and all letters converted to lowercase (e.g., "600"). Leading/trailing whitespace removed. It will be `None` if not found.

### 5. Peaks (`peaks`)
A list representing the names of the peaks visited during the tour. <br>
**Cleaning:** It will be `None` if no peaks are found.

In [0]:
import scrapy
from scrapy.selector import Selector

# Parses a hikr.org tour and extracts all the attributes we are interested in.
# Parameters:
#   tour: HTML Content of the hikr.org tour.
# Result:
#   A dictionary containing the extracted attributes for this tour.
def parse(tour):
    # id is the filename, text is the file content
    [id, text] = tour

    # Parse it using scrapy
    document = Selector(text=text)

    name_raw = document.css('h1.title::text').get()
    # Clean: remove leading/trailing whitespace. If not found, it remains None.
    name = name_raw.strip() if name_raw else None

    # 1. Region
    region_xpath = '//tr[td[@class="fiche_rando_b" and contains(normalize-space(.), "Region:")]]/td[@class="fiche_rando"]//a/text()'
    region_parts_raw = document.xpath(region_xpath).getall()
    # Clean: Strip whitespace from each part, filter out any empty strings, then join with " » ".
    # If no parts are found, region remains None.
    region = None
    if region_parts_raw:
        cleaned_parts = [part.strip() for part in region_parts_raw if part.strip()]
        if cleaned_parts:
            region = ' » '.join(cleaned_parts)

    # 2. Tour Date
    tour_date_xpath = '//tr[td[@class="fiche_rando_b" and contains(normalize-space(.), "Tour Datum:")]]/td[@class="fiche_rando"]/text()'
    tour_date_raw_str = document.xpath(tour_date_xpath).get()

    tour_date = None
    if tour_date_raw_str:
        # Clean: remove leading/trailing whitespace.
        cleaned_date_str = tour_date_raw_str.strip()
        # German month names to month number mapping
        german_months = {
            'Januar': 1, 'Februar': 2, 'März': 3, 'April': 4, 'Mai': 5, 'Juni': 6,
            'Juli': 7, 'August': 8, 'September': 9, 'Oktober': 10, 'November': 11, 'Dezember': 12
        }
        parts = cleaned_date_str.split()
        if len(parts) == 3:
            day = int(parts[0])
            month_name = parts[1]
            year = int(parts[2])

            month_number = german_months.get(month_name)

            if month_number:
                tour_date = f"{day:02d}.{month_number:02d}.{year}"

    # 3. Descent
    descent_xpath = '//tr[td[@class="fiche_rando_b" and contains(normalize-space(.), "Abstieg:")]]/td[@class="fiche_rando"]/text()'
    descent_raw_str = document.xpath(descent_xpath).get()
    # Clean: convert to lowercase, remove "m", and strip whitespace.
    descent_meters = None
    if descent_raw_str:
        descent_meters = descent_raw_str.lower().replace('m', '').strip()

    # 4. Ascent
    ascent_xpath = '//tr[td[@class="fiche_rando_b" and contains(normalize-space(.), "Aufstieg:")]]/td[@class="fiche_rando"]/text()'
    ascent_raw = document.xpath(ascent_xpath).get()
    # Clean: convert to lowercase, remove "m", and strip whitespace.
    ascent_meters = None
    if ascent_raw:
        ascent_meters = ascent_raw.lower().replace('m', '').strip()

    # 5. Peaks
    peaks_xpath = '//td[contains(text(),"Wegpunkte:")]/following-sibling::td//img[contains(@src, "ico2_peak_s.png")]/following-sibling::a/text()'
    peaks_raw = document.xpath(peaks_xpath).getall()
    # If there is one or more peaks, set it to the list otherwise set it to None
    peaks = None
    if peaks_raw:
        peaks = peaks_raw

    # Assemble the result dictionary
    result = {
        'name': name,
        'region': region,
        'tour_date': tour_date,
        'descent_meters': descent_meters,
        'ascent_meters': ascent_meters,
        'peaks': peaks
    }

    return result

In [13]:
# Extract the 200posts.zip file in the same folder where this jupyter notebook is located.
# Then you can run the parse function on an example tour:
with open('200posts/post24013.html', 'r', encoding='utf-8') as f:
    content = f.read()
    r = parse([f.name, content])
    print(r)


{'name': 'Hinteres Schöneck (3128m), Ortler Alpen', 'region': 'Welt » Italien » Trentino-Südtirol', 'tour_date': '04.06.2010', 'descent_meters': '1300', 'ascent_meters': '1300', 'peaks': ['Hinteres Schöneck 3128 m   (11)', 'Vorderes Schöneck 2908 m   (5)']}


# Part 2 - Parallelization & Aggregation (Spark)

It is highly recommended to wait with this part until after the Spark lecture!

This part only works on databricks!

Warning: In the community edition, databricks terminates your cluster after 2 hours of inactivity. If you re-create the cluster, you will lose your data.

To add a library such as scrapy, it might not always work with the command above. Should you run into problems, you can alternatively do the following:

- Go to the "Clusters" panel on the left
- Select your cluster
- Go to the "Libraries" tab
- Click "Install New"
- Choose "PyPI" as library source
- Type the name of the library, "scrapy", into the package field
- Click "Install"
- Wait until the installation has finished

You can now use the newly installed library in your code.

In [0]:
# AWS Access configuration
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAYFVAOB5OOWVMUSCZ")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "BddS/X8w8qXdBkkqbzmO+5RgmfPRQuIT+wbUxrn2")

# Contains the whole hikr dataset.
# The full dataset contains 42330 tours and has a size of around 3 GB. Use this dataset for your final results if possible.
# Execution is likely to take around 20 to 30 minutes.
# tours = sc.wholeTextFiles("s3a://dawr-hikr3/hikr/*.html")

# There are 8176 posts starting with "post10*", which is a nicer size for smaller experiments. (~ 5 minutes to process)
# tours = sc.wholeTextFiles("s3a://dawr-hikr3/hikr/post10*.html")

# If you want to further shrink the dataset size for testing, you can add another zero (or more) to the pattern (post100*.html).
tours = sc.wholeTextFiles("s3a://dawr-hikr3/hikr/post10*.html")

In [0]:
# Apply our parse function and persist the parse results so that we can repeat all further steps easier
import pyspark
parsedTours = tours.map(parse).persist(pyspark.StorageLevel.MEMORY_AND_DISK)

In [0]:
# actually force the parsedTours RDD. Above it was only defined, but not evaluated. This will take a while.
parsedTours.count()

Out[64]: 8176

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row

# Filtering
In this section filtering of the peaks will be done. This will help select the perfect peak for the tour this summer.
This will happen in multiple steps:

## Region Filtering
Since the main language of the participants of the tour is german. To make the feel at home, only tours that are in Switzerland or one of its german-speaking neighbors will be kept.

## Date Filtering
Tours can change over time due to weather conditions and other factors. Therefore it is important to plan based on current data. Due to this, only tours that are newer than the 1st of January 2015 will be kept.

## Descent Filtering
Generally, people prefer tours that don't have a big descent. Long descents are hard on the knees and can be dangerous. Therefore, all tours that have a greater descent than ascent will be filtered out.

In [0]:
# save the parsedTours to a DataFrame
df_tours = spark.createDataFrame(parsedTours.collect())

# Only keep the german speaking neighbors of Switzerland including Switzerland
countries_to_keep = [
    'Schweiz',   
    'Deutschland',  
    'Österreich',  
    'Liechtenstein' 
]

print(f"Original tour count: {df_tours.count()}")

# Create a filter condition, that only becomes true when the condition is met and then stays true
filter_condition = lit(False)
for country in countries_to_keep:
    filter_condition = filter_condition | col('region').contains(country)

# Apply the filter to the DataFrame
df_tours_filtered = df_tours.filter(filter_condition)

print(f"Filtered Region tour count: {df_tours_filtered.count()}")

# Convert the tour_date column to a date type
df_tours_with_date = df_tours_filtered.withColumn("parsed_tour_date", to_date(col("tour_date"), "dd.MM.yyyy"))

# Define cutoff date
cutoff_date = to_date(lit("01.01.2015"), "dd.MM.yyyy")

# Filter to keep tours on or after the cutoff date
df_tours_filtered = df_tours_with_date.filter(col("parsed_tour_date") >= cutoff_date).drop("parsed_tour_date")

print(f"Filtered Date tour count: {df_tours_filtered.count()}")

# Ensure that the ascent and descent values are numeric
df_tours_numeric_metrics = df_tours_filtered.withColumn("ascent_val", col("ascent_meters").cast("float")).withColumn("descent_val", col("descent_meters").cast("float"))

# Filter to keep tours where the ascent is greater than or equal to the descent
df_tours_filtered = df_tours_numeric_metrics.filter(
    (when(col("descent_val").isNull(), 0).otherwise(col("descent_val")) <=
     when(col("ascent_val").isNull(), 0).otherwise(col("ascent_val")))
).drop("ascent_val", "descent_val")

print(f"Filtered Descent tour count: {df_tours_filtered.count()}")

Original tour count: 8176
Filtered Region tour count: 5573
Filtered Date tour count: 4624
Filtered Descent tour count: 4076


In [0]:
df_tours_filtered.show()

+-------------+--------------+--------------------+--------------------+--------------------+----------+
|ascent_meters|descent_meters|                name|               peaks|              region| tour_date|
+-------------+--------------+--------------------+--------------------+--------------------+----------+
|          700|           700|Läged Windgällen ...|[Chli Geissberg 1...|Welt » Schweiz » Uri|27.09.2015|
|          720|           720|     Iseler im Nebel|[Iseler 1876 m   ...|Welt » Österreich...|27.09.2015|
|          450|           450|Monte Tamaro - (a...|[Monte Tamaro 196...|Welt » Schweiz » ...|20.09.2015|
|          200|           200|Rund um den See (...|                null|Welt » Schweiz » Uri|27.09.2015|
|          650|           650|    Taborberg 1'618m|                null|Welt » Österreich...|27.09.2015|
|          350|           350|Gipfelsammeln im ...|[Predigtstuhl 161...|Welt » Deutschlan...|28.09.2015|
|         1050|          1050|La Maya (St Marti...|[La 

## Part 2 Final ranking
List your final top 10 mountain peaks that occur the most often within your filtered tours. State how you handle cases where two peaks occur the same number of times.

### Special case handling
The code first expands tours with multiple peaks into individual rows, then groups them to count occurrences and sum total ascent/descent to get the total height meters. Finally, it lists the top 10 peaks, prioritizing frequency and then total height meters.

In [0]:
# This flattens the data so each row represents a single peak associated with a tour
df_peaks = df_tours_filtered.withColumn("peaks", explode(df_tours_filtered.peaks))

# Create a view to let SQL queries run directly on the DataFrame
df_peaks.createOrReplaceTempView("peak_ranking")

ranking_query = """
SELECT
  peaks,
  COUNT(*) as count,
  SUM(ascent_meters + descent_meters) as total_height_meters
FROM 
  peak_ranking
GROUP BY 
  peaks
ORDER BY 
  count DESC,
  total_height_meters DESC
LIMIT 10
"""

ranking_df = spark.sql(ranking_query)
ranking_df.show(truncate=False)

+------------------------------------------+-----+-------------------+
|peaks                                     |count|total_height_meters|
+------------------------------------------+-----+-------------------+
|Monte Lema 1621 m   (153)                 |18   |34560.0            |
|Balmfluechöpfli 1289 m   (153)            |17   |17068.0            |
|Monte Gradiccioli 1936 m   (140)          |14   |33400.0            |
|Monte Generoso / Calvagione 1701 m   (246)|14   |25920.0            |
|Grosser Mythen 1898 m   (225)             |13   |18380.0            |
|Röti 1395 m   (117)                       |13   |15268.0            |
|Moncucco 1518 m   (41)                    |12   |20060.0            |
|Säntis 2502 m   (409)                     |11   |18879.0            |
|Schnebelhorn 1292 m   (180)               |11   |18743.0            |
|Rigi Kulm 1798 m   (206)                  |11   |10669.0            |
+------------------------------------------+-----+-------------------+



# Part 3 - Analysis of data quality
Add further code for analysis of data quality here. Don't forget to include at least one aggregation, such as average tour length per season.