# Part 1 - Parsing Hikes

In the first part of the assignment, you need to extract the relevant attributes from the web pages scraped from hikr.org. Extend the `parse` function so that it extracts all the attributes you need to create the ranking. You may define your own helper functions and extend the `parse` function as necessary. Just keep in mind that the arguments/result types should not be changed to enable you to use the function in the second part of the assignment.

In [None]:
%pip install scrapy

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
from scrapy.selector import Selector

# Parses a hikr.org tour and extracts all the attributes we are interested in.
# Parameters:
#   tour: HTML Content of the hikr.org tour.
# Result:
#   A dictionary containing the extracted attributes for this tour.
def parse(tour):
    [path, text] = tour
    document = Selector(text=text)
    parts = path.split('/')
    tour_id = parts[-1].split('.')[0]

    result = {
        'id': tour_id,
        'name': document.css('h1.title::text').get(),
        'date': document.css('td.fiche_rando_b:contains("Tour Datum:") + td.fiche_rando::text').get(),
        'duration': document.css('td.fiche_rando_b:contains("Zeitbedarf:") + td.fiche_rando::text').get(),
        'ascent': document.css('td.fiche_rando_b:contains("Aufstieg:") + td.fiche_rando::text').get(),
        'descent': document.css('td.fiche_rando_b:contains("Abstieg:") + td.fiche_rando::text').get(),
        'difficulties': document.css('td.fiche_rando_b:contains("Schwierigkeit:") + td.fiche_rando a::text').getall(),
        'difficulty_labels': document.css('td.fiche_rando_b:contains("Schwierigkeit:")::text').getall(),
        'routepoints': document.css('td.fiche_rando_b:contains("Wegpunkte:") + td.fiche_rando ul li a::text').getall(),
        'views': document.xpath("//div[contains(text(), 'Diese Seite wurde')]/b/text()").get(),
    }
    return result

In [None]:
import re

def transform_routepoints(routepoints):
    transformed = []
    pattern = re.compile(r'(.+?)\s(\d+)\s*m')

    for point in routepoints:
        match = pattern.match(point)
        if match:
            name = match.group(1).strip()
            height = int(match.group(2).strip())
            transformed.append({'name': name, 'height': height})
        else:
            transformed.append({'name': point.strip(), 'height': None})
    
    return transformed


month_mappings = {
    'Januar': '01', 'Februar': '02', 'März': '03', 'April': '04', 'Mai': '05', 'Juni': '06',
    'Juli': '07', 'August': '08', 'September': '09', 'Oktober': '10', 'November': '11', 'Dezember': '12'
}


def transform_date(date):
    if date:
        date = date.strip()
        day, month, year = date.split()
        return f'{year}-{month_mappings[month]}-{day.zfill(2)}'
    return None


def transform_duration(duration):
    if duration:
        duration = duration.strip()
        if 'Tage' in duration:
            days = int(duration.split()[0])
            return round(days * 24.0, 2)
        else:
            hours, minutes = map(int, duration.split(':'))
            return round(hours + minutes / 60, 2)
    return None


def transform_ascent(ascent):
    if ascent:
        return int(ascent.strip().split(' m')[0])
    return None


def transform_descent(descent):
    if descent:
        return int(descent.strip().split(' m')[0])
    return None


category_mappings = {
    'Eisklettern': [('WI1', 1), ('WI2', 2), ('WI3', 3), ('WI4', 4), ('WI5', 5), ('WI6', 6), ('WI7', 7)],
    'Klettern': [
        ('K1', 1), ('K2', 2), ('K3', 3), ('K4', 4), ('K5', 5), ('K6', 6), 
        ('I', 1), ('II', 2), ('III', 3), ('IV', 4), ('V', 5), ('VI', 6), ('VII', 7), ('VIII', 8), ('IX', 9), ('X', 10), ('XI', 11), ('XII', 12),
    ],
    'Klettersteig': [
        ('K1', 1), ('K2', 2), ('K3', 3), ('K4', 4), ('K5', 5), ('K6', 6), 
        ('I', 1), ('II', 2), ('III', 3), ('IV', 4), ('V', 5), ('VI', 6), ('VII', 7), ('VIII', 8), ('IX', 9), ('X', 10), ('XI', 11), ('XII', 12),
    ],
    'Wandern': [('T1', 1), ('T2', 2), ('T3', 3), ('T4', 4), ('T5', 5), ('T6', 6)],
    'Hochtouren': [('L', 1), ('WS', 2), ('ZS', 3), ('S', 4), ('SS', 5), ('AS', 6), ('EX', 7)],
    'Schneeschuhtour': [('WT1', 1), ('WT2', 2), ('WT3', 3), ('WT4', 4), ('WT5', 5), ('WT6', 6)],
    'Ski': [('L', 1), ('WS', 2), ('ZS', 3), ('S', 4), ('SS', 5), ('AS', 6), ('EX', 7)],
    'Mountainbike': [('L', 1), ('WS', 2), ('ZS', 3), ('S', 4), ('SS', 5)]
}


def transform_categories_and_difficulties(labels, difficulties):
    categories = [label.replace(' Schwierigkeit:', '').strip() for label in labels]
    for category in categories:
        if category not in category_mappings.keys():
            return None, None
    
    difficulties_result = []

    for i, difficulty in enumerate(difficulties):
        difficulty_parts = difficulty.split()
        category_mapping = category_mappings[categories[i]]
        resulting_difficulity = None
        for difficulty_part in difficulty_parts:
            for mapping in category_mapping:
                if mapping[0] == difficulty_part:
                    resulting_difficulity = mapping[1]
                    break;

        difficulties_result.append({'category': categories[i], 'difficulty': resulting_difficulity})

    return categories, difficulties_result


def transform_views(views):
    if views:
        return int(views.strip().split()[0])
    return None


def transform_features(tour):
  category, difficulties = transform_categories_and_difficulties(tour['difficulty_labels'], tour['difficulties'])
  transformed_tour = {
    'name': tour['name'],
    'id': tour['id'],
    'category': category,
    'date': transform_date(tour['date']),
    'duration': transform_duration(tour['duration']),
    'ascent': transform_ascent(tour['ascent']),
    'descent': transform_descent(tour['descent']),
    'difficulty_per_category': difficulties,
    'routepoints': transform_routepoints(tour['routepoints']) if tour['routepoints'] else None,
    'views': transform_views(tour['views']),
  }

  return transformed_tour

In [None]:
# Extract the 200posts.zip file in the same folder where this jupyter notebook is located.
# Then you can run the parse function on an example tour:
with open('200posts/post24010.html', encoding='utf-8') as f:
  content = f.read()
  r = parse([f.name, content])
  r = transform_features(r)
  print(r)

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
File [0;32m<command-1391648822763191>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Extract the 200posts.zip file in the same folder where this jupyter notebook is located.[39;00m
[1;32m      2[0m [38;5;66;03m# Then you can run the parse function on an example tour:[39;00m
[0;32m----> 3[0m [38;5;28;01mwith[39;00m [38;5;28mopen[39m([38;5;124m'[39m[38;5;124m200posts/post24010.html[39m[38;5;124m'[39m, encoding[38;5;241m=[39m[38;5;124m'[39m[38;5;124mutf-8[39m[38;5;124m'[39m) [38;5;28;01mas[39;00m f:
[1;32m      4[0m     content [38;5;241m=[39m f[38;5;241m.[39mread()
[1;32m      5[0m     r [38;5;241m=[39m parse([f[38;5;241m.[39mname, content])

File [0;32m/databricks/python/lib/python3.11/site-packages/IPython/core/interactiveshell.py:286[0m, in [0;36m_modified_open[0;34m(file

# Part 2 - Parallelization & Aggregation (Spark)

NOTE: It is highly recommended to wait with this part until after the Spark lecture!

NOTE: This part only works on databricks!

To add a library such as scrapy, perform the following steps:

- Go to the "Clusters" panel on the left
- Select your cluster
- Go to the "Libraries" tab
- Click "Install New"
- Choose "PyPI" as library source
- Type the name of the library, "scrapy", into the package field
- Click "Install"
- Wait until the installation has finished

You can now use the newly installed library in your code.

Note: In the community edition, databricks terminates your cluster after 2 hours of inactivity. If you re-create the cluster, you will have to perform these steps again.

In [None]:
# AWS Access configuration
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAXLOQRT47SHG4WZNH")

# Contains the whole hikr dataset.
# The full dataset contains 113710 tours and has a size of around 6 GB.
# There are 46854 posts starting with "post1*". Use this dataset for your final results if possible. Execution is likely to take around 30~45 minutes.
# There are 8176 posts starting with "post10*", which is a nicer size for smaller experiments.
# If you want to further shrink the dataset size for testing, you can add another zero to the pattern (post100*.html).
tours = sc.wholeTextFiles("s3a://dawr-hikr/post1*.html")

In [None]:
def filter_tour(tour):
    if  tour['views'] is None or tour['views'] < 1000:
        return False
  
    if tour['category'] is None or 'Eisklettern' in tour['category'] or 'Ski' in tour['category']:
        return False
      
    if tour['descent'] is None or tour['descent'] > 1000 or tour['ascent'] is None or tour['ascent'] > 1000:
        return False
      
    if tour['duration'] is None or tour['duration'] > 7:
        return False
      
    return True

In [None]:
# Apply our parse function and persist the parse results so that we can repeat all further steps easier
import pyspark

parsedTours = tours \
  .map(parse) \
  .map(transform_features) \
  .filter(filter_tour) \
  .persist(pyspark.StorageLevel.MEMORY_AND_DISK)

# Count the number of reports for each peak
peak_reports = parsedTours \
    .flatMap(lambda tour: [(peak['name'], 1) for peak in tour['routepoints']]) \
    .reduceByKey(lambda a, b: a + b)

# Sort peaks by report count in descending order and by name in ascending order
sorted_peaks = peak_reports \
    .sortBy(lambda x: (-x[1], x[0]))

# Collect the top 10 peaks
top_10_peaks = sorted_peaks \
    .take(10)

print(top_10_peaks)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1391648822763195>, line 17[0m
[1;32m     11[0m peak_reports [38;5;241m=[39m parsedTours \
[1;32m     12[0m     [38;5;241m.[39mflatMap([38;5;28;01mlambda[39;00m tour: [(peak[[38;5;124m'[39m[38;5;124mname[39m[38;5;124m'[39m], [38;5;241m1[39m) [38;5;28;01mfor[39;00m peak [38;5;129;01min[39;00m tour[[38;5;124m'[39m[38;5;124mroutepoints[39m[38;5;124m'[39m]]) \
[1;32m     13[0m     [38;5;241m.[39mreduceByKey([38;5;28;01mlambda[39;00m a, b: a [38;5;241m+[39m b)
[1;32m     15[0m [38;5;66;03m# Sort peaks by report count in descending order and by name in ascending order[39;00m
[1;32m     16[0m sorted_peaks [38;5;241m=[39m peak_reports \
[0;32m---> 17[0m     [38;5;241m.[39msortBy([38;5;28;01mlambda[39;00m x: ([38;5;241m-[39mx[[38;5;241m1[39m], x[[38