# Homework #4 - MapReduce
## Advanced Information Systems and Big Data - A.Y. 2025-2026

MAP-REDUCE Homework, including dataSet generation

### Environment

In [23]:
# IMPORTS

import random
import csv
from datetime import datetime, timedelta
import os, json
import pandas as pd
import matplotlib.pyplot as plt
import subprocess
import time

### PySpark & mrjob

In [24]:
# mrjob: Python framework for writing MapReduce jobs (simulates Hadoop)
!pip install mrjob



In [25]:
# PySpark: Python interface for Apache Spark (in-memory distributed processing)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, to_date, avg, count, round as spark_round
from pyspark.sql.types import StructType, StructField, StringType, FloatType

---
## Part 1: Dataset

### Generation

Generating a synthetic dataset with highway transits for years 2015 and 2025, simulating toll increases over time and differences between vehicle types.

In [26]:
# NUM_RECORDS: 200,000 transits (50% for year 2015, 50% for year 2025)
NUM_RECORDS = 200000

In [27]:
# VEHICLE_TYPES: 5 categories with different tolls (Auto, Moto, Furgone,
#               Camion, Bus) and cost increases over time
TIPI_VEICOLI = {
    'Auto':
      {'base_2015': 5.0,
       'base_2025': 6.5},
    'Moto':
      {'base_2015': 2.0,
       'base_2025': 3.0},
    'Furgone':
      {'base_2015': 8.0,
       'base_2025': 10.5},
    'Camion':
      {'base_2015': 15.0,
       'base_2025': 20.0},
    'Bus':
      {'base_2015': 12.0,
       'base_2025': 16.0}
}

In [28]:
def genera_veicolo():
    return random.choice(list(TIPI_VEICOLI.keys()))

In [29]:
# ROUTES: 14 real Italian highway routes
TRATTE = [
    'A1-Milano-Bologna', 'A1-Bologna-Firenze', 'A1-Firenze-Roma', 'A1-Roma-Napoli',
    'A4-Torino-Milano', 'A4-Milano-Venezia', 'A4-Venezia-Trieste',
    'A14-Bologna-Rimini', 'A14-Rimini-Ancona', 'A14-Ancona-Bari',
    'A7-Milano-Genova', 'A10-Genova-Ventimiglia',
    'A22-Brennero-Modena', 'A13-Bologna-Padova' ]

In [30]:
def genera_tratta():
    return random.choice(TRATTE)

In [31]:
# TIME_SLOTS: 4 slots (Morning, Afternoon, Evening, Night)
FASCE_ORARIE = [
    'Mattina (6-12)', 'Pomeriggio (12-18)', 'Sera (18-24)', 'Notte (0-6)']

In [32]:
# PROVINCES: 15 real Italian province codes
PROVINCE = [
    'MI', 'RM', 'NA', 'TO', 'BO', 'FI', 'VE', 'GE',
    'BA', 'PA', 'BZ', 'TN', 'PD', 'AN', 'RN']

In [33]:
# Generates a random date in the specified year
def genera_data(anno):
    inizio = datetime(anno, 1, 1)
    fine = datetime(anno, 12, 31)
    delta = fine - inizio
    giorni_casuali = random.randint(0, delta.days)
    return (inizio + timedelta(days=giorni_casuali)).strftime('%d/%m/%Y')

In [34]:
# Generates a realistic toll based on vehicle type, year and route
def genera_pedaggio(tipo_veicolo, anno, tratta):
    if anno == 2015:
        base = TIPI_VEICOLI[tipo_veicolo]['base_2015']
    else:
        base = TIPI_VEICOLI[tipo_veicolo]['base_2025']

    # Multiplier based on route length ->  (simulates length)
    moltiplicatore_tratta = 1.0 + (hash(tratta) % 10) / 10

    # Random variation (+/- 20%) -> (simulates discounts/surcharges)
    variazione = random.uniform(0.8, 1.2)

    return round(base * moltiplicatore_tratta * variazione, 2)

In [35]:
# Generates the complete dataset
def genera_dataset():
    records = []
    id_counter = 1

    # Distribution: 50% records for 2015, 50% for 2025
    for _ in range(NUM_RECORDS // 2):
        for anno in [2015, 2025]:

            tipo_veicolo = genera_veicolo()
            tratta = genera_tratta()

            record = {
                'IDVeicolo': f'V{id_counter:06d}',
                'TipoVeicolo': tipo_veicolo,
                'Tratta': tratta,
                'Pedaggio': genera_pedaggio(tipo_veicolo, anno, tratta),
                'DataTransito': genera_data(anno),
                'FasciaOraria': random.choice(FASCE_ORARIE),
                'Provincia': random.choice(PROVINCE)
            }
            records.append(record)
            id_counter += 1

    return records

dataset = genera_dataset()

### Save

In [36]:
# SAVE DATASET IN CSV FORMAT
OUTPUT_FILE = 'caselli_autostradali.csv'

with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['IDVeicolo', 'TipoVeicolo', 'Tratta',
                  'Pedaggio', 'DataTransito', 'FasciaOraria', 'Provincia']
    writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=';')
    writer.writeheader() # header with field names
    writer.writerows(dataset)

print(f"Dataset generated: {len(dataset)} records saved to '{OUTPUT_FILE}'")

Dataset generated: 200000 records saved to 'caselli_autostradali.csv'


In [37]:
# ANTEPRIMA DATASET

df = pd.read_csv(OUTPUT_FILE, delimiter=';')
df.head(10)

Unnamed: 0,IDVeicolo,TipoVeicolo,Tratta,Pedaggio,DataTransito,FasciaOraria,Provincia
0,V000001,Furgone,A1-Firenze-Roma,13.68,23/04/2015,Mattina (6-12),TO
1,V000002,Bus,A1-Bologna-Firenze,26.47,17/12/2025,Sera (18-24),VE
2,V000003,Auto,A4-Torino-Milano,9.07,06/05/2015,Mattina (6-12),BA
3,V000004,Auto,A10-Genova-Ventimiglia,7.7,10/11/2025,Sera (18-24),MI
4,V000005,Bus,A1-Milano-Bologna,18.85,03/07/2015,Pomeriggio (12-18),BA
5,V000006,Bus,A1-Bologna-Firenze,22.75,26/12/2025,Mattina (6-12),
6,V000007,Camion,A22-Brennero-Modena,28.39,23/10/2015,Notte (0-6),BZ
7,V000008,Bus,A7-Milano-Genova,26.83,06/07/2025,Pomeriggio (12-18),AN
8,V000009,Moto,A4-Venezia-Trieste,3.02,24/01/2015,Notte (0-6),TO
9,V000010,Auto,A14-Bologna-Rimini,5.78,26/08/2025,Notte (0-6),TN


---
## Part 2: Implementazione con mrjob
Utilizziamo un approccio con **Combiner** per ottimizzare le prestazioni. Poiché la media non è una funzione associativa, nel Combiner calcoliamo somma parziale e conteggio, e nel Reducer calcoliamo la media finale.

**Flusso logico**:
1. **Map**: Per ogni record, se l'anno è 2015 o 2025, emette `(TipoVeicolo_Year, (CostoPedaggio, 1))`
2. **Combine**: Aggrega localmente somma e conteggio per chiave `(TipoVeicolo_Year)`, emette `(TipoVeicolo_Year, (CostoPedaggi, count))`
3. **Reduce**: Calcola la media finale per ogni `(TipoVeicolo, Year)`

In [38]:
%%writefile mapreduce_mrjob.py
# STRUTTURA DI UN JOB MRJOB:
# La classe eredita da MRJob e definisce le fasi del processing:
from mrjob.job import MRJob

class PedaggioMedioMRJob(MRJob):
    """Calcola pedaggio medio per (TipoVeicolo, Year) con Combiner."""

    def mapper(self, _, line): #  Riceve una riga del CSV alla volta
        if line.startswith("IDVeicolo"): # header
            return
        try:
            parts = line.strip().split(";") # Estrae i campi (split sul delimitatore ';')
            tipo, pedaggio = parts[1], float(parts[3])
            anno = int(parts[4].split("/")[-1])
            if anno in [2015, 2025]:
                yield f"{tipo}_{anno}", {"sum": pedaggio, "count": 1} # media NON è associativa, quindi passiamo sum e count
                #  * chiave = "TipoVeicolo_Year" (es. "Auto_2015")
                #  * valore = {"sum": pedaggio, "count": 1}
        except:
            pass


    def combiner(self, key, values):
        # Aggrega localmente somma e conteggio
        s, c = 0, 0
        for v in values:
            s += v["sum"]
            c += v["count"]
        yield key, {"sum": s, "count": c}


    def reducer(self, key, values):
        s, c = 0, 0
        for v in values:
            s += v["sum"]
            c += v["count"]
        yield key, {"media": round(s/c, 2), "transiti": c} # Calcola la media finale: sum(pedaggi) / count(transiti)

if __name__ == "__main__":
    PedaggioMedioMRJob.run()

Overwriting mapreduce_mrjob.py


In [39]:
# JOB EXECUTION MRJOB
start_time = time.time()

# execution
result = subprocess.run(
    ['python', 'mapreduce_mrjob.py', OUTPUT_FILE],
    capture_output=True,
    text=True
)

mrjob_time = time.time() - start_time
print(f"Execution time: {mrjob_time:.2f} seconds")

Execution time: 10.94 seconds


In [40]:
# RESULTS VISUALIZATION MRJOB
# L'output di mrjob è in formato: "chiave"\tJSON_valore

print("Risultati mrjob:")
print("-" * 40)

mrjob_results = {}
res2015 = []
res2025 = []

for line in result.stdout.strip().split('\n'): # split by lines
    if line:
        try:
            key, value = line.split('\t') # mrjob output: "chiave"\tJSON_valore
            # PARSING
            key = key.strip('"')
            value = json.loads(value.replace("'", '"'))

            mrjob_results[key] = value
            if "2015" in key:
                res2015.append(f"{key}: Media = €{value['media']:.2f}, Transiti = {value['transiti']}")
            elif "2025" in key:
                res2025.append(f"{key}: Media = €{value['media']:.2f}, Transiti = {value['transiti']}")
        except:
            print(line)

for item in sorted(res2015):
    print(item)
print()
for item in sorted(res2025):
    print(item)

Risultati mrjob:
----------------------------------------
Auto_2015: Media = €7.85, Transiti = 19964
Bus_2015: Media = €18.84, Transiti = 20053
Camion_2015: Media = €23.57, Transiti = 20180
Furgone_2015: Media = €12.58, Transiti = 19934
Moto_2015: Media = €3.15, Transiti = 19869

Auto_2025: Media = €10.21, Transiti = 19966
Bus_2025: Media = €25.18, Transiti = 19981
Camion_2025: Media = €31.34, Transiti = 19982
Furgone_2025: Media = €16.49, Transiti = 20010
Moto_2025: Media = €4.73, Transiti = 20061


---
## Part 3: Implementazione con PySpark

PySpark offre un'API più ad alto livello rispetto a mrjob, con ottimizzazioni automatiche e supporto per operazioni complesse.


In [43]:
pyspark_code = '''

# IMPLEMENTAZIONE MAPREDUCE CON PYSPARK
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import time
import json


def main():
    # INIZIALIZZAZIONE SPARKSESSION (SparkContext creato auto)
    #   master("local[*]") usa tutti i core disponibili in locale
    spark = SparkSession.builder \
        .appName("PedaggioMedio_MapReduce") \
        .master("local[*]") \
        .config("spark.driver.memory", "2g") \
        .config("spark.log.level", "ERROR") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR") #toglie log ridondanti

    start_time = time.time()

    # CARICAMENTO DEI DATI
    df = spark.read.csv(
      "caselli_autostradali.csv",
      header=True,
      sep=";",
      inferSchema=True)

    # conversione dataFrame in RDD
    rdd = df.rdd


    # ========================================
    # IMPLEMENTAZIONE MAPREDUCE CON RDD
    # ========================================

    # Funzione per estrarre l'anno dalla data in formato GG/MM/YYYY
    def extract_year(date_str):
        return int(date_str.split("/")[-1])


    # MAP function
    # map() trasforma in rdd con le colonne: (TipoVeicolo, Pedaggio, Year)
    # filter() trasforma in rdd con le righe con Year 2015 o 2025
    # map() trasforma in rdd con le colonne: (TipoVeicolo_Year, (Pedaggio, 1))
    mapped_rdd = rdd \
        .map(lambda row: (row["TipoVeicolo"], row["Pedaggio"], extract_year(row["DataTransito"]))) \
        .filter(lambda x: x[2] in [2015, 2025]) \
        .map(lambda x: (f"{x[0]}_{x[2]}", (x[1], 1)))

    # REDUCE function
    # reduceByKey() trasforma in rdd aggregando per chiave (TipoVeicolo_Year) e sommando i valori ([0]=pedaggio, [1]=conteggio)
    #     include la funzione di combiner
    # mapValues() trasforma in rdd con le colonne: (TipoVeicolo_Year, (media, conteggio))
    #     la media è calcolata dividendo la somma dei pedaggi per il conteggio
    reduced_rdd = mapped_rdd \
        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
        .mapValues(lambda x: {"media": round(x[0] / x[1], 2), "transiti": x[1]})


    # Collect and print results (similar format to mrjob for consistency with following sections)
    risultati = reduced_rdd.collect()
    for chiave, valore in sorted(risultati):
        print(f'"{chiave}"\t{json.dumps(valore)}')

    elapsed_time = time.time() - start_time
    print(f"Execution time: {elapsed_time:.2f} seconds", file=__import__(\'sys\').stderr)

    spark.stop()

if __name__ == "__main__":
    main()
'''

# Salva il file
with open('mapreduce_pyspark.py', 'w') as f:
    f.write(pyspark_code)

In [44]:
# JOB EXECUTION PYSPARK
start_time = time.time()

# execution
pyspark_result = subprocess.run(
    ['python', 'mapreduce_pyspark.py'],
    capture_output=True,
    text=True
)

pyspark_time = time.time() - start_time
print(f"Execution time: {pyspark_time:.2f} seconds")

Execution time: 30.07 seconds


In [45]:
# RESULTS VISUALIZATION SPARK
# Output format: "chiave"\tJSON_valore

print("Risultati PySpark:")
print("-" * 40)

# Parse and display results
pyspark_results = {}
res2015 = []
res2025 = []

for line in pyspark_result.stdout.strip().split('\n'): # split by lines
    if line:
        try:
            key, value = line.split('\t')  # mrjob output: "chiave"\tJSON_valore
            # PARSING
            key = key.strip('"')
            value = json.loads(value.replace("'", '"'))

            pyspark_results[key] = value
            if "2015" in key:
                res2015.append(f"{key}: Media = €{value['media']:.2f}, Transiti = {value['transiti']}")
            elif "2025" in key:
                res2025.append(f"{key}: Media = €{value['media']:.2f}, Transiti = {value['transiti']}")
        except:
            print(line)

for item in sorted(res2015):
    print(item)
print()
for item in sorted(res2025):
    print(item)

Risultati PySpark:
----------------------------------------
Auto_2015: Media = €7.85, Transiti = 19964
Bus_2015: Media = €18.84, Transiti = 20053
Camion_2015: Media = €23.57, Transiti = 20180
Furgone_2015: Media = €12.58, Transiti = 19934
Moto_2015: Media = €3.15, Transiti = 19869

Auto_2025: Media = €10.21, Transiti = 19966
Bus_2025: Media = €25.18, Transiti = 19981
Camion_2025: Media = €31.34, Transiti = 19982
Furgone_2025: Media = €16.49, Transiti = 20010
Moto_2025: Media = €4.73, Transiti = 20061


---
## Part 4: Calculate 2015 vs 2025 Variation

In [46]:
# TOLL VARIATION ANALYSIS 2015 -> 2025
print("=" * 60)
print("AVERAGE TOLL VARIATION 2015 -> 2025")
print("=" * 60)

# Group results by vehicle type
pedaggi_2015 = {}
pedaggi_2025 = {}

for key, value in mrjob_results.items():
    tipo, anno = key.rsplit('_', 1)
    if anno == '2015':
        pedaggi_2015[tipo] = value['media']
    else:
        pedaggi_2025[tipo] = value['media']

# Calculate and print variations
print(f"{'Vehicle Type':<12} {'2015':>10} {'2025':>10} {'Abs.Var.':>10} {'Var.%':>10}")
print("-" * 54)

variazioni = []
for tipo in sorted(pedaggi_2015.keys()):
    p2015 = pedaggi_2015[tipo]
    p2025 = pedaggi_2025[tipo]

    # Absolute variation: pedaggio_2025 - pedaggio_2015 (in €)
    var_ass = p2025 - p2015
    # Percentage variation: ((2025 - 2015) / 2015) * 100
    var_perc = ((p2025 - p2015) / p2015) * 100

    variazioni.append({'tipo': tipo, '2015': p2015, '2025': p2025, 'var_ass': var_ass, 'var_perc': var_perc})
    print(f"{tipo:<12} {p2015:>10.2f}€ {p2025:>10.2f}€ {var_ass:>+10.2f}€ {var_perc:>+9.1f}%")

AVERAGE TOLL VARIATION 2015 -> 2025
Vehicle Type       2015       2025   Abs.Var.      Var.%
------------------------------------------------------
Auto               7.85€      10.21€      +2.36€     +30.1%
Bus               18.84€      25.18€      +6.34€     +33.7%
Camion            23.57€      31.34€      +7.77€     +33.0%
Furgone           12.58€      16.49€      +3.91€     +31.1%
Moto               3.15€       4.73€      +1.58€     +50.2%
