In [28]:
# Importuojamos bibliotekos
from pyspark.sql import SparkSession  # Spark funkcionalumo pagrindas
import re  # Reguliariosios išraiškos (naudojama tik dalijimui)
import time  # Laiko matavimui (nenaudojama skaičiavimams)

# Konfigūracija
file_path = "duom_full.txt"

# Pagalbinė funkcija linijų apdorojimui
def parse_line_simple(line_text):
    """
    Konvertuoja eilutę '{{key1=value1}{key2=value2}...}' į žodyną {'key1': 'value1', 'key2': 'value2', ...}

    Args:
        line_text (str): Viena eilutė iš failo.

    Returns:
        dict: Žodynas su duomenimis arba None, jei eilutė tuščia ar netinkama.
    """
    record = {}
    try:
        if not line_text or not line_text.strip():
            return None

        # Pašalina '{{' ir '}}', padalija į dalis pagal '}{'
        content = line_text.strip().strip('{}')
        parts = content.split('}{')

        # Apdoroja kiekvieną dalį (pvz., 'a=1')
        for part in parts:
            part_cleaned = part.strip('{}')
            key_value = part_cleaned.split('=', 1)

            if len(key_value) == 2:
                key, value = key_value[0].strip(), key_value[1].strip()
                if key:
                    record[key] = value

        return record if record else None
    except:
        return None

# Spark sesijos inicijavimas
print("Spark sesija")
spark = SparkSession.builder \
    .appName("LogisticsRDDAnalysis_Simple") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")  # Mažiau informacinių pranešimų

print(f"Spark sesija inicijuota. Skaitomi duomenys iš: {file_path}")

# Duomenų įkėlimas ir apdorojimas
try:
    print(f"Įkeliami duomenys iš {file_path}...")
    raw_lines_rdd = sc.textFile(file_path)

    print("Apdorojami duomenys...")
    parsed_rdd_with_none = raw_lines_rdd.map(parse_line_simple)
    parsed_rdd = parsed_rdd_with_none.filter(lambda record: record is not None)

    print("Talpinami duomenys atmintyje...")
    parsed_rdd.cache()

    record_count = parsed_rdd.count()
    if record_count == 0:
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"Klaida: Nepavyko apdoroti duomenų iš failo: {file_path}")
        print(f"Patikrinkite failo kelią ir formatą '{{key=value}}{{key=value}}...'")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        spark.stop()
        exit()
    else:
        print(f"Sėkmingai apdorota ir išsaugota {record_count} įrašų.")

except Exception as e:
    print(f"Klaida skaitant failą '{file_path}': {e}")
    print("Patikrinkite, ar failas egzistuoja ir kelias teisingas.")
    spark.stop()
    exit()

Spark sesija
Spark sesija inicijuota. Skaitomi duomenys iš: duom_full.txt
Įkeliami duomenys iš duom_full.txt...
Apdorojami duomenys...
Talpinami duomenys atmintyje...
Sėkmingai apdorota ir išsaugota 59659 įrašų.


In [29]:
# Užduotis 1

print("\n 1 užduotis")


# Apibrėžiame funkciją svoriui išgauti ir derivinam grupę pagal svorio vertę.
def derive_group_and_extract_weight(record):
    """
    Gets 'svoris' from a record, converts it to float,
    DERIVES the weight group based on the float value,
    and returns a list containing a tuple `[(derived_group, weight_float)]`.
    Returns an empty list `[]` if 'svoris' key is missing or value is not a valid number.
    """
    weight_str = record.get('svoris')

    if weight_str is not None: # Patikrinam, ar egzistuoja raktas „svoris
        try:
            weight_float = float(weight_str) # konvertuojam stringą į skaičių

            if weight_float < 50:
                derived_group = '<50'
            elif weight_float < 300: # jei ne < 50, tikrinam ar < 300
                derived_group = '<300'
            else: # galiausiai lieka tik >= 300
                derived_group = '>300'


            return [(derived_group, weight_float)]

        except ValueError:
            # Jei 'svoris' negali būti konvertuotas į float (pvz.: tekstas)
            return [] # returninam empty listą
    else:
        # Jei nebuvo rakto „svoris
        return [] # irgi grąžinam empty listą

# Naudojau `flatMap` funkciją, kad gauti (derived_group, weight) poras.
weight_data_rdd = parsed_rdd.flatMap(derive_group_and_extract_weight) #


# Apibrėžiu "zero value"
initial_accumulator = (0.0, 0, float('inf'), float('-inf'))

# sequence funkcija
def merge_value_into_accumulator(acc, value):
    current_sum, current_count, current_min, current_max = acc
    new_sum = current_sum + value
    new_count = current_count + 1
    new_min = min(current_min, value)
    new_max = max(current_max, value)
    return (new_sum, new_count, new_min, new_max)

# combiner funkcija
def combine_accumulators(acc1, acc2):
    sum1, count1, min1, max1 = acc1
    sum2, count2, min2, max2 = acc2
    combined_sum = sum1 + sum2
    combined_count = count1 + count2
    combined_min = min(min1, min2)
    combined_max = max(max1, max2)
    return (combined_sum, combined_count, combined_min, combined_max)

# agregacija
aggregated_stats_rdd = weight_data_rdd.aggregateByKey(
    initial_accumulator,
    merge_value_into_accumulator,
    combine_accumulators
)

# funkcija isvesciai
def format_weight_stats(item):
    group, stats = item
    total_sum, count, min_val, max_val = stats
    avg_val = total_sum / count if count > 0 else 0.0
    min_val = min_val if min_val != float('inf') else "N/A"
    max_val = max_val if max_val != float('-inf') else "N/A"
    return (group, {'min': min_val, 'max': max_val, 'avg': avg_val, 'count': count})

# formatavimas
formatted_stats_rdd = aggregated_stats_rdd.map(format_weight_stats)

# rezultatu surinkimas
weight_stats_list = formatted_stats_rdd.collect()


# Apibrėžkiame pagalbinę funkciją, kad kiekvienam grupės pavadinimui būtų priskirtas rūšiavimo eilės numeris
def get_group_sort_order(group_name):
    if group_name == '<50':
        return 0  # pirma grupe
    elif group_name == '<300':
        return 1  # antra grupe
    elif group_name == '>300':
        return 2  # trecia grupe
    else:
        return 99 #netiketos/unikalios grupes paciam gale

if weight_stats_list:
    print("Statistika:")


    # Rūšiuojam sąrašą su custom key funkcija
    # key funkcija taikoma kiekvienam sąrašo elementui (item[0] yra grupės pavadinimas)
    sorted_weight_stats_list = sorted(weight_stats_list, key=lambda item: get_group_sort_order(item[0]))

    # iteruojame per listą
    for group, stats in sorted_weight_stats_list:
        print(f"  Svorio grupė '{group}':")
        print(f"    Minimalus svoris: {stats['min']:.2f}" if isinstance(stats['min'], float) else f"    Min Weight: {stats['min']}")
        print(f"    Maksimalus svoris: {stats['max']:.2f}" if isinstance(stats['max'], float) else f"    Max Weight: {stats['max']}")
        print(f"    Vidutiniškas svoris: {stats['avg']:.2f}")


 1 užduotis
Statistika:
  Svorio grupė '<50':
    Minimalus svoris: 0.00
    Maksimalus svoris: 49.95
    Vidutiniškas svoris: 5.91
  Svorio grupė '<300':
    Minimalus svoris: 50.00
    Maksimalus svoris: 299.20
    Vidutiniškas svoris: 107.82
  Svorio grupė '>300':
    Minimalus svoris: 300.00
    Maksimalus svoris: 6784.00
    Vidutiniškas svoris: 748.02


In [30]:
# Užduotis 2

print("\n 2 užduotis")


# funkcija ištraukimui route, zone, and date.
def extract_route_zone_date_info(record):

    #Gauna 'marsrutas', 'geografinė zona' ir 'sustojimo duomenys' iš įrašo.
    #Grąžina sąrašą, kuriame yra tuple `[(maršrutas, zona, data)]`, jei maršrutas ir zona egzistuoja,
    #arba tuščias sąrašas `[]`.

    route = record.get('marsrutas')
    zone = record.get('geografine zona')
    date = record.get('sustojimo data') # jeigu trūksta bus none

    if route and zone: # Reikia tik route ir zone užduočiai
        return [(route, zone, date)]
    else:
        return []

# `flatMap` ištraukti informacijai
# Inputo RDD: Dictionaries
# Outputo RDD (`route_zone_date_rdd`): ('102', 'Z1', '2018-01-02'), ('102', 'Z1', None), ...
route_zone_date_rdd = parsed_rdd.flatMap(extract_route_zone_date_info)

# jeigu rdd per didelis ir letai veikia
# route_zone_date_rdd.cache()

# Pasirenkam tik maršrutą ir zoną.
route_zone_rdd = route_zone_date_rdd.map(lambda x: (x[0], x[1])) # (route, zone)

# Surandame visą skaičių UNIQUE maršrutų skaičiuojant procentaliai
total_unique_routes_rdd = route_zone_rdd.map(lambda x: x[0]).distinct()
total_unique_routes_count = total_unique_routes_rdd.count()

# Surandame unique route ir zone poras
distinct_route_zone_rdd = route_zone_rdd.distinct()

# Groupinam
zones_per_route_rdd = distinct_route_zone_rdd.groupByKey()

# filtruojam pagal tai, ar unique zones daugiau uz 1
multi_zone_routes_grouped_rdd = zones_per_route_rdd.filter(lambda x: len(list(x[1])) > 1)
multi_zone_route_ids_rdd = multi_zone_routes_grouped_rdd.map(lambda x: x[0])
multi_zone_routes_list = multi_zone_route_ids_rdd.collect()

# skaičiuojam procentus
percentage_part_a = (len(multi_zone_routes_list) / total_unique_routes_count * 100) if total_unique_routes_count > 0 else 0

if multi_zone_routes_list:
    print(f"Maršrutų kiekis: {len(multi_zone_routes_list)} , kurių metu aplankoma daugiau nei vieną geografinė zon (bendrai paėmus).")
    print(f"  Procentas: {percentage_part_a:.2f}%\n")

    routes_to_show = sorted(multi_zone_routes_list)
    print(f"  Maršrutai: {routes_to_show[:100]}{'...' if len(routes_to_show) > 100 else ''}")

# isfiltruojam kur datos nėra
# Input RDD (`route_zone_date_rdd`): ('102', 'Z1', '2018-01-02'), ('102', 'Z1', None), ...
rdd_with_dates = route_zone_date_rdd.filter(lambda x: x[2] is not None) # x[2] is the date

# vėl ieškome unique route ir data porų procentam skaičiuoti
total_unique_route_date_rdd = rdd_with_dates.map(lambda x: (x[0], x[2])).distinct()
total_unique_route_date_count = total_unique_route_date_rdd.count()


# Sukuriam key-value porą, kur raktas yra route ir data o value - zona
route_date_key_rdd = rdd_with_dates.map(lambda x: ((x[0], x[2]), x[1]))

# ieskome situ unique kombinaciju
distinct_route_date_zone_rdd = route_date_key_rdd.distinct()
zones_per_route_day_rdd = distinct_route_date_zone_rdd.groupByKey()

# kaip ir pirmai, filtriuojam kur unique zones skaičius > 1
multi_zone_same_day_grouped_rdd = zones_per_route_day_rdd.filter(lambda x: len(list(x[1])) > 1)
multi_zone_same_day_keys_rdd = multi_zone_same_day_grouped_rdd.map(lambda x: x[0])
multi_zone_same_day_list = multi_zone_same_day_keys_rdd.collect()

# skaičiuoajm procentą
percentage_part_b = (len(multi_zone_same_day_list) / total_unique_route_date_count * 100) if total_unique_route_date_count > 0 else 0

if multi_zone_same_day_list:
    print(f"Maršrutų skaičius {len(multi_zone_same_day_list)} ,kai tą pačią dieną aplankoma daugiau nei vieną zona (iš unikalių porų):")
    print(f"  Procentas to skaičiaus, kai aplankoma daugiau nei viena zona (iš unikalių porų): {percentage_part_b:.2f}%\n")

    # sortinam
    sorted_list = sorted(multi_zone_same_day_list, key=lambda item: (str(item[0]), item[1]))
    print("  Pavyzdžiai: ")
    for route, date in sorted_list[:10]:
        print(f"    Maršrutas: {route}, Data (metai, mėnesis, diena): {date}")


 2 užduotis
Maršrutų kiekis: 339 , kurių metu aplankoma daugiau nei vieną geografinė zon (bendrai paėmus).
  Procentas: 80.33%

  Maršrutai: ['103', '107', '109', '110', '111', '112', '113', '114', '116', '117', '119', '127', '128', '131', '137', '138', '140', '141', '142', '143', '144', '145', '146', '148', '150', '151', '152', '153', '154', '156', '157', '160', '161', '163', '164', '165', '166', '167', '170', '171', '172', '173', '174', '175', '176', '179', '203', '204', '205', '207', '208', '209', '210', '211', '212', '213', '214', '216', '217', '218', '219', '220', '221', '222', '223', '224', '227', '228', '229', '230', '232', '234', '236', '238', '240', '241', '243', '244', '245', '246', '247', '248', '250', '251', '252', '253', '254', '259', '260', '261', '262', '263', '267', '268', '269', '280', '281', '283', '284', '285']...
Maršrutų skaičius 3012 ,kai tą pačią dieną aplankoma daugiau nei vieną zona (iš unikalių porų):
  Procentas to skaičiaus, kai aplankoma daugiau nei viena 

In [26]:
# Užduotis 3

print("\n 3 užduotis")


# Apibrėžiame funkciją, skirtą zonai, savaitės dienai, siuntoms ir klientams išskirti.
def extract_zone_weekday_counts(record):

    #Gauna 'geografine zona', 'sustojimo savaites diena', 'siuntu skaicius',
    #ir 'Sustojimo klientu skaicius'.
    #Konvertuoja counts į int.
    #Grąžina sąrašą `[ ((zona, savaitės diena), (siuntos, klientai)) ]`, jei pavyko,
    #arba `[]` priešingu atveju. Raktas yra tuple (zona, savaitės diena), reikšmė - tuple (siuntos, klientai).

    zone = record.get('geografine zona')
    weekday_str = record.get('sustojimo savaites diena')
    shipments_str = record.get('siuntu skaicius')
    customers_str = record.get('Sustojimo klientu skaicius')

    # ziurime ar yra visi laukai
    if zone and weekday_str and shipments_str and customers_str:
        try:
            # konvertuojam skaicius is stringu i integerius
            weekday = int(weekday_str)
            shipments = int(shipments_str)
            customers = int(customers_str)

            # Key: (zone, weekday)  Value: (shipments, customers)
            return [((zone, weekday), (shipments, customers))]
        except ValueError:
            # Jei nepavyksta konvertuoti kurio nors skaičiaus
            return []
    else:
        # jeigu truksta kokio rakto
        return []

# `flatMap`
# Input RDD: Dictionaries
# Output RDD (`counts_data_rdd`): ( ('Z1', 2), (1, 1) ), ( ('Z1', 2), (4, 1) ), ...
counts_data_rdd = parsed_rdd.flatMap(extract_zone_weekday_counts)

# Naudokite `reduceByKey`, kad susumuotumėte siuntas ir klientus pagal kiekvieną unikalų raktą (zona, savaitės diena).
# `reduceByKey` yra transformacija, kuri sujungia to paties rakto reikšmes naudodama asociatyviąją funkciją.
# Čia funkcija priima du reikšmių rinkinius `a = (shipments1, customers1)` and `b = (shipments2, customers2)`
# ir grąžina naują tuple `(shipments1+shipments2, customers1+customers2)`.

# Input RDD: ( ('Z1', 2), (1, 1) ), ( ('Z1', 2), (4, 1) ), ( ('Z1', 3), (2, 1) ), ...
# Output RDD (`aggregated_counts_rdd`): ( ('Z1', 2), (5, 2) ), ( ('Z1', 3), (2, 1) ), ...
aggregated_counts_rdd = counts_data_rdd.reduceByKey(
    lambda tuple_a, tuple_b: (tuple_a[0] + tuple_b[0], tuple_a[1] + tuple_b[1])
    # tuple_a[0] yra siuntos iš pirmo tuple, tuple_b[0] yra siuntos iš antro tuple
    # tuple_a[1] yra klientai iš pirmojo tuple, tuple_b[1] yra klientai iš antrojo tuple
)

#sortinam
sorted_counts_rdd = aggregated_counts_rdd.sortByKey(ascending=True)

# sucollectiname rezultatus
final_counts_list = sorted_counts_rdd.collect()

# printinam lentelės formate
if final_counts_list:
    print("Geogr. zona   Savaitės d.  Viso siuntų    Viso klientų")

    # iteracija
    for (zone, weekday), (shipments, customers) in final_counts_list:
        print(f"{zone:<12}  {weekday:<11}  {shipments:<13}  {customers:<16} ")


# uzbaigiame spark sesiją
spark.stop()
print("baigta.")


 3 užduotis
Geogr. zona   Savaitės d.  Viso siuntų    Viso klientų
Z1            1            14092          6867             
Z1            2            21024          9979             
Z1            3            20494          10106            
Z1            4            16775          8110             
Z1            5            14653          7616             
Z1            6            302            127              
Z2            1            2910           1927             
Z2            2            4933           3045             
Z2            3            5335           3087             
Z2            4            3603           2234             
Z2            5            4073           2340             
Z2            6            23             11               
Z3            1            2896           1831             
Z3            2            4206           2874             
Z3            3            4531           2914             
Z3            4            3036 