In [192]:


# Import necessary libraries
from pyspark.sql import SparkSession # The main entry point for Spark functionality
import re # Regular expressions, used here for parsing (though the final version uses splits)
import time # To potentially measure execution time (not used for calculation here)

# --- Configuration ---
file_path = "duom_full.txt"

# --- Helper Function to Parse Lines ---

def parse_line_simple(line_text):
    """
    This function takes one line of text from the input file, which looks like:
    '{{key1=value1}{key2=value2}...}'
    and tries to convert it into a Python dictionary (a collection of key-value pairs), like:
    {'key1': 'value1', 'key2': 'value2', ...}

    Args:
        line_text (str): A single line from the raw data file.

    Returns:
        dict: A dictionary representing the data in the line, or
        None: If the line is empty or cannot be parsed properly.
    """
    record = {} # Create an empty dictionary to store the results
    try:
        # 1. Basic Check: Ignore empty lines
        if not line_text or not line_text.strip():
             return None # Return nothing if the line is empty

        # 2. Remove Outer Braces: Get rid of the starting '{{' and ending '}}'
        #    Example: '{{a=1}{b=2}}' becomes 'a=1}{b=2'
        #    We use strip('{') and strip('}') which might remove extra braces if present,
        #    but it's simple for this format. A more robust way might use slicing [2:-2].
        content = line_text.strip().strip('{}')

        # 3. Split into Pairs: Split the string by '}{' to get individual key-value parts
        #    Example: 'a=1}{b=2' becomes ['a=1', 'b=2']
        parts = content.split('}{')

        # 4. Process Each Pair: Loop through each part (like 'a=1')
        for part in parts:
            # Sometimes the first/last part might still have a stray brace, remove it.
            # Example: '{a=1' becomes 'a=1', 'b=2}' becomes 'b=2'
            part_cleaned = part.strip('{}')

            # Split the part by the FIRST equals sign '=' into key and value
            # Example: 'a=1' becomes ['a', '1']
            # Using maxsplit=1 ensures that if a value itself contains '=', it's kept together.
            key_value = part_cleaned.split('=', 1)

            # Check if we successfully got two pieces (a key and a value)
            if len(key_value) == 2:
                key = key_value[0].strip() # Get the key, remove extra spaces
                value = key_value[1].strip() # Get the value, remove extra spaces

                # Make sure the key is not empty before adding to the dictionary
                if key:
                    record[key] = value # Add the key-value pair to our dictionary

        # 5. Return Result: Return the dictionary if we added anything, otherwise None
        return record if record else None

    except Exception as e:
        return None

# --- Spark Session Initialization ---

# Create a SparkSession. This is the main connection to a Spark cluster.
# .appName(): Gives your Spark job a name, useful for monitoring.
# .master("local[*]"): Tells Spark to run locally using all available CPU cores.
#                      This is good for development/testing on a single machine.
#                      For a real cluster, you'd use a different master URL.
# .getOrCreate(): Gets an existing SparkSession or creates a new one if none exists.
print("Initializing Spark Session...")
spark = SparkSession.builder \
    .appName("LogisticsRDDAnalysis_Simple") \
    .master("local[*]") \
    .getOrCreate()

# Get the SparkContext from the SparkSession.
# SparkContext (usually named 'sc') is the main entry point for low-level RDD API.
sc = spark.sparkContext

# Set log level to WARN to see less informational messages from Spark
sc.setLogLevel("ERROR")

print(f"Spark Session Initialized. Reading data from file: {file_path}")

# --- Data Loading and Parsing ---

# Use try-except to catch errors during file reading
try:
    # 1. Read the text file into an RDD (Resilient Distributed Dataset).
    #    An RDD is Spark's core data structure, representing a collection of items
    #    distributed across the cluster nodes, which can be processed in parallel.
    #    Each element in this initial RDD is one line (string) from the text file.
    print(f"Loading data from {file_path}...")
    raw_lines_rdd = sc.textFile(file_path)

    # 2. Parse each line using our 'parse_line_simple' function.
    #    `.map()` is a *transformation* that applies a function to each element
    #    of an RDD and returns a *new* RDD with the results.
    #    Input RDD: contains strings (lines from the file)
    #    Output RDD: contains dictionaries (parsed data) or None (for bad lines)
    print("Parsing data...")
    parsed_rdd_with_none = raw_lines_rdd.map(parse_line_simple)

    # 3. Filter out the None values (lines that failed parsing).
    #    `.filter()` is a *transformation* that keeps only elements for which
    #    the given function returns True. `lambda x: x is not None` is a short
    #    way to write a function that checks if an element `x` is not None.
    #    Input RDD: contains dictionaries and None values
    #    Output RDD: contains only dictionaries
    parsed_rdd = parsed_rdd_with_none.filter(lambda record: record is not None)

    # 4. Cache the parsed RDD in memory.
    #    `.cache()` is a *transformation* that tells Spark to store the results
    #    of this RDD in memory the first time it's computed.
    #    This is very useful because we will use `parsed_rdd` multiple times (for each task).
    #    Without caching, Spark would re-read the file and re-parse it every time.
    #    Note: Caching is "lazy" - it only happens when an *action* (like count or collect) is called.
    print("Caching parsed data for faster access...")
    parsed_rdd.cache()

    # 5. Trigger an action to perform parsing and caching, and check data.
    #    `.count()` is an *action* that computes the number of elements in the RDD.
    #    Calling an action forces Spark to execute all the planned transformations (`map`, `filter`).
    record_count = parsed_rdd.count()
    if record_count == 0:
        # If no records were parsed, something is wrong (bad path? empty file? bad format?).
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"Warning: No records were successfully parsed from the file: {file_path}")
        print(f"Please check that the file exists, the path is correct, and the format")
        print(f"matches the expected '{{key=value}}{{key=value}}...' structure.")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        spark.stop() # Stop Spark
        exit() # Exit the script
    else:
        print(f"Successfully parsed and cached {record_count} records.")

except Exception as e:
    # If there was an error reading the file itself (e.g., file not found)
    print(f"Error reading or parsing file '{file_path}': {e}")
    print("Please ensure the file exists and the path is correct in the 'file_path' variable.")
    spark.stop()
    exit()

Initializing Spark Session...
Spark Session Initialized. Reading data from file: duom_full.txt
Loading data from duom_full.txt...
Parsing data...
Caching parsed data for faster access...


[Stage 0:>                                                          (0 + 7) / 7]

Successfully parsed and cached 59659 records.


                                                                                

### Slenkstinis lygmuo (5-6): Suskaičiuokite mažiausią, didžiausią ir vidutinį (aritmetinis vidurkis) siuntų svorį (laukas "svoris") skirtingose svorio grupėse (laukas "svorio grupe").

In [195]:
# ==============================================================================
# == Task 1: Min/Max/Avg Weight per DERIVED Weight Group ('Slenkstinis lygmuo') ==
# ==============================================================================
print("\n--- 1 užduotis: Mažiausio / Didžiausio / Vidutinio svorio skaičiavimas pagal svorio grupę ---")


# Define a function to extract weight and DERIVE the group based on the weight value.
def derive_group_and_extract_weight(record):
    """
    Gets 'svoris' from a record, converts it to float,
    DERIVES the weight group based on the float value,
    and returns a list containing a tuple `[(derived_group, weight_float)]`.
    Returns an empty list `[]` if 'svoris' key is missing or value is not a valid number.
    """
    weight_str = record.get('svoris') # Safely get 'svoris' value (or None if key missing)

    if weight_str is not None: # Check if the 'svoris' key existed
        try:
            weight_float = float(weight_str) # Try converting weight string to a number

            if weight_float < 50:
                derived_group = '<50'
            elif weight_float < 300: # If not < 50, check if it's < 300
                derived_group = '<300'
            else: # Otherwise, it must be >= 300
                derived_group = '>300'


            return [(derived_group, weight_float)] 

        except ValueError:
            # If 'svoris' could not be converted to a float (e.g., it's text)
            return [] # Return an empty list to discard this record for this task
    else:
        # If 'svoris' key was missing in the record
        return [] # Return an empty list

# Use `flatMap` with the NEW function to get (derived_group, weight) pairs.
# This completely ignores the original 'svorio grupe' field from the file.
weight_data_rdd = parsed_rdd.flatMap(derive_group_and_extract_weight) # <--- USE THE NEW FUNCTION HERE


# Define the "zero value"
initial_accumulator = (0.0, 0, float('inf'), float('-inf'))

# Define the "sequence function"
def merge_value_into_accumulator(acc, value):
    current_sum, current_count, current_min, current_max = acc
    new_sum = current_sum + value
    new_count = current_count + 1
    new_min = min(current_min, value)
    new_max = max(current_max, value)
    return (new_sum, new_count, new_min, new_max)

# Define the "combiner function"
def combine_accumulators(acc1, acc2):
    sum1, count1, min1, max1 = acc1
    sum2, count2, min2, max2 = acc2
    combined_sum = sum1 + sum2
    combined_count = count1 + count2
    combined_min = min(min1, min2)
    combined_max = max(max1, max2)
    return (combined_sum, combined_count, combined_min, combined_max)

# Perform the aggregation.
aggregated_stats_rdd = weight_data_rdd.aggregateByKey(
    initial_accumulator,
    merge_value_into_accumulator,
    combine_accumulators
)

# Define a function to format the results nicely.
def format_weight_stats(item):
    group, stats = item
    total_sum, count, min_val, max_val = stats
    avg_val = total_sum / count if count > 0 else 0.0
    min_val = min_val if min_val != float('inf') else "N/A"
    max_val = max_val if max_val != float('-inf') else "N/A"
    return (group, {'min': min_val, 'max': max_val, 'avg': avg_val, 'count': count})

# Apply the formatting function.
formatted_stats_rdd = aggregated_stats_rdd.map(format_weight_stats)

# Collect the results.
weight_stats_list = formatted_stats_rdd.collect()


# Define a helper function to assign a sort order number to each group name
def get_group_sort_order(group_name):
    if group_name == '<50':
        return 0  # First
    elif group_name == '<300':
        return 1  # Second
    elif group_name == '>300':
        return 2  # Third
    else:
        return 99 # Put any other unexpected groups last

if weight_stats_list:
    print("Svorio statistika pagal svorio grupę:")


    # Sort the list using the custom key function BEFORE the loop
    # The key function is applied to each item in the list (item[0] is the group name)
    sorted_weight_stats_list = sorted(weight_stats_list, key=lambda item: get_group_sort_order(item[0]))

    # Now iterate through the CUSTOM SORTED list
    for group, stats in sorted_weight_stats_list:
        print(f"  Svorio grupė '{group}':")
        # Use isinstance to check type before formatting, avoids errors if min/max are "N/A"
        print(f"    Minimalus svoris: {stats['min']:.2f}" if isinstance(stats['min'], float) else f"    Min Weight: {stats['min']}")
        print(f"    Maksimalus svoris: {stats['max']:.2f}" if isinstance(stats['max'], float) else f"    Max Weight: {stats['max']}")
        print(f"    Vidutiniškas svoris: {stats['avg']:.2f}")



--- 1 užduotis: Mažiausio / Didžiausio / Vidutinio svorio skaičiavimas pagal svorio grupę ---
Svorio statistika pagal svorio grupę:
  Svorio grupė '<50':
    Minimalus svoris: 0.00
    Maksimalus svoris: 49.95
    Vidutiniškas svoris: 5.91
  Svorio grupė '<300':
    Minimalus svoris: 50.00
    Maksimalus svoris: 299.20
    Vidutiniškas svoris: 107.82
  Svorio grupė '>300':
    Minimalus svoris: 300.00
    Maksimalus svoris: 6784.00
    Vidutiniškas svoris: 748.02


### Tipinis lygmuo (7-8): Raskite maršrutus, kurie aplanko daugiau nei vieną geografinę zoną (laukas "geografine zona"). Koks tai procentas nuo visų maršrutų? Raskite atvejus, kai tai daroma ta pačia diena (laukas "sustojimo data"). Koks tai procentas?

In [201]:
# ==============================================================================
# == Task 2: Routes Spanning Multiple Zones ('Tipinis lygmuo') ==
# ==============================================================================
print("\n--- Užduotis 2: Maršrutų, aplankančių kelias geografines zonas, paieška ---")


# Define a function to extract route, zone, and date from a record.
def extract_route_zone_date_info(record):
    """
    Gets 'marsrutas', 'geografine zona', and 'sustojimo data' from a record.
    Returns a list containing a tuple `[(route, zone, date)]` if route and zone exist,
    or an empty list `[]` otherwise. Date is included for Part B.
    """
    route = record.get('marsrutas')
    zone = record.get('geografine zona')
    date = record.get('sustojimo data') # Can be None if missing

    if route and zone: # We only need route and zone to exist for this task
        return [(route, zone, date)]
    else:
        return []

# Use `flatMap` to extract the needed info.
# Input RDD: Dictionaries
# Output RDD (`route_zone_date_rdd`): ('102', 'Z1', '2018-01-02'), ('102', 'Z1', None), ...
route_zone_date_rdd = parsed_rdd.flatMap(extract_route_zone_date_info)
# Optional: Cache if the RDD is large and computations are slow
# route_zone_date_rdd.cache()




# --- Part A: Routes visiting more than one zone (overall, any day) ---

# 1. Select only route and zone.
route_zone_rdd = route_zone_date_rdd.map(lambda x: (x[0], x[1])) # (route, zone)

# 2. Find the total number of UNIQUE routes (for percentage calculation)
total_unique_routes_rdd = route_zone_rdd.map(lambda x: x[0]).distinct() # Get route IDs, find unique
total_unique_routes_count = total_unique_routes_rdd.count()

# 3. Find unique (route, zone) pairs.
distinct_route_zone_rdd = route_zone_rdd.distinct()

# 4. Group the unique zones by route.
zones_per_route_rdd = distinct_route_zone_rdd.groupByKey()

# 5. Filter to keep only routes where the number of unique zones is greater than 1.
multi_zone_routes_grouped_rdd = zones_per_route_rdd.filter(lambda x: len(list(x[1])) > 1)

# 6. Extract just the route numbers (the keys).
multi_zone_route_ids_rdd = multi_zone_routes_grouped_rdd.map(lambda x: x[0])

# 7. Collect the results (action).
multi_zone_routes_list = multi_zone_route_ids_rdd.collect()

# 8. Calculate the percentage for Part A
percentage_part_a = (len(multi_zone_routes_list) / total_unique_routes_count * 100) if total_unique_routes_count > 0 else 0

# Print Part A results
print("\n--- Užduotis 2 (A dalis) ---")

if multi_zone_routes_list:
    print(f"Rasta {len(multi_zone_routes_list)} maršrutų, aplankančių daugiau nei vieną geografinių zoną (bendroje analizėje).")
    print(f"  Procentas nuo visų unikalių maršrutų: {percentage_part_a:.2f}%\n")
    
    routes_to_show = sorted(multi_zone_routes_list)
    print(f"  Maršrutai (rodomi pirmi 100): {routes_to_show[:100]}{'...' if len(routes_to_show) > 100 else ''}")








# --- Part B: Routes visiting more than one zone on the SAME day ---

# 1. Filter out records where the date is missing.
# Input RDD (`route_zone_date_rdd`): ('102', 'Z1', '2018-01-02'), ('102', 'Z1', None), ...
rdd_with_dates = route_zone_date_rdd.filter(lambda x: x[2] is not None) # x[2] is the date

# 2. Find the total number of UNIQUE (route, date) pairs (for percentage calculation)
total_unique_route_date_rdd = rdd_with_dates.map(lambda x: (x[0], x[2])).distinct() # Get (route, date), find unique
total_unique_route_date_count = total_unique_route_date_rdd.count()


# 3. Create key-value pairs where the key is (route, date) and the value is zone.
route_date_key_rdd = rdd_with_dates.map(lambda x: ((x[0], x[2]), x[1])) # Key=(route, date), Value=zone

# 4. Find unique ((route, date), zone) combinations.
distinct_route_date_zone_rdd = route_date_key_rdd.distinct()

# 5. Group the unique zones by the (route, date) key.
zones_per_route_day_rdd = distinct_route_date_zone_rdd.groupByKey()

# 6. Filter to keep only (route, date) keys where the number of unique zones is > 1.
multi_zone_same_day_grouped_rdd = zones_per_route_day_rdd.filter(lambda x: len(list(x[1])) > 1)

# 7. Extract just the (route, date) keys.
multi_zone_same_day_keys_rdd = multi_zone_same_day_grouped_rdd.map(lambda x: x[0])

# 8. Collect the results (action).
multi_zone_same_day_list = multi_zone_same_day_keys_rdd.collect()

# 9. Calculate the percentage for Part B
percentage_part_b = (len(multi_zone_same_day_list) / total_unique_route_date_count * 100) if total_unique_route_date_count > 0 else 0

# Print Part B results
print("\n\n\n\n--- Užduotis 2 (B dalis) ---")

if multi_zone_same_day_list:
    print(f"Rasta {len(multi_zone_same_day_list)} atvejų (unikalių maršruto-datos porų), kai maršrutas tą pačią dieną aplankė daugiau nei vieną zoną.")
    print(f"  Procentas nuo visų unikalių maršruto-datos porų: {percentage_part_b:.2f}%\n")

    # Sort by route first, then by date for consistent output
    sorted_list = sorted(multi_zone_same_day_list, key=lambda item: (str(item[0]), item[1])) # Sort route as string robustly
    print("  Pavyzdžiai (rodomi pirmi 10):")
    for route, date in sorted_list[:10]:
        print(f"    Maršrutas: {route}, Data: {date}")


# Optional: Unpersist if RDD was cached
# route_zone_date_rdd.unpersist()


--- Užduotis 2: Maršrutų, aplankančių kelias geografines zonas, paieška ---

--- Užduotis 2 (A dalis) ---
Rasta 339 maršrutų, aplankančių daugiau nei vieną geografinių zoną (bendroje analizėje).
  Procentas nuo visų unikalių maršrutų: 80.33%

  Maršrutai (rodomi pirmi 100): ['103', '107', '109', '110', '111', '112', '113', '114', '116', '117', '119', '127', '128', '131', '137', '138', '140', '141', '142', '143', '144', '145', '146', '148', '150', '151', '152', '153', '154', '156', '157', '160', '161', '163', '164', '165', '166', '167', '170', '171', '172', '173', '174', '175', '176', '179', '203', '204', '205', '207', '208', '209', '210', '211', '212', '213', '214', '216', '217', '218', '219', '220', '221', '222', '223', '224', '227', '228', '229', '230', '232', '234', '236', '238', '240', '241', '243', '244', '245', '246', '247', '248', '250', '251', '252', '253', '254', '259', '260', '261', '262', '263', '267', '268', '269', '280', '281', '283', '284', '285']...




--- Užduotis 2 (

### Puikus lygmuo (9-10): Sudarykite lentelę, kurioje matytųsi kiek pristatyta siuntų ("siuntu skaicius") bei aptarnauta klientų ("Sustojimo klientu skaicius") skirtinguose geografinėse zonose ("geografine zona") skirtingomis savaitės dienomis ("sustojimo savaites diena"). Palyginkite užduoties sprendimo laiką su MapReduce versiją.

In [177]:
# ==============================================================================
# == Task 3: Shipments/Customers per Zone per Weekday ('Puikus lygmuo') ==
# ==============================================================================
print("\n--- 3 užduotis: Siuntų ir klientų agregavimas pagal zoną ir savaitės dieną ---")


# Define a function to extract zone, weekday, shipments, and customers.
def extract_zone_weekday_counts(record):
    """
    Gets 'geografine zona', 'sustojimo savaites diena', 'siuntu skaicius',
    and 'Sustojimo klientu skaicius'.
    Converts counts to integers.
    Returns a list `[ ((zone, weekday), (shipments, customers)) ]` if successful,
    or `[]` otherwise. The key is a tuple (zone, weekday), value is a tuple (shipments, customers).
    """
    zone = record.get('geografine zona')
    weekday_str = record.get('sustojimo savaites diena')
    shipments_str = record.get('siuntu skaicius')
    customers_str = record.get('Sustojimo klientu skaicius')

    # Check if all required fields are present
    if zone and weekday_str and shipments_str and customers_str:
        try:
            # Convert numbers from strings to integers
            weekday = int(weekday_str)
            shipments = int(shipments_str)
            customers = int(customers_str)
            # Return the data as a list containing one key-value pair tuple
            # Key: (zone, weekday)  Value: (shipments, customers)
            return [((zone, weekday), (shipments, customers))]
        except ValueError:
            # If any number conversion fails
            return []
    else:
        # If any key is missing
        return []

# Use `flatMap` to extract the needed info and filter out bad records.
# Input RDD: Dictionaries
# Output RDD (`counts_data_rdd`): ( ('Z1', 2), (1, 1) ), ( ('Z1', 2), (4, 1) ), ...
counts_data_rdd = parsed_rdd.flatMap(extract_zone_weekday_counts)

# Use `reduceByKey` to sum the shipments and customers for each unique key (zone, weekday).
# `reduceByKey` is a transformation that combines values for the same key using an associative function.
# Here, the function takes two value tuples `a = (shipments1, customers1)` and `b = (shipments2, customers2)`
# and returns a new tuple `(shipments1+shipments2, customers1+customers2)`.
# Input RDD: ( ('Z1', 2), (1, 1) ), ( ('Z1', 2), (4, 1) ), ( ('Z1', 3), (2, 1) ), ...
# Output RDD (`aggregated_counts_rdd`): ( ('Z1', 2), (5, 2) ), ( ('Z1', 3), (2, 1) ), ...
aggregated_counts_rdd = counts_data_rdd.reduceByKey(
    lambda tuple_a, tuple_b: (tuple_a[0] + tuple_b[0], tuple_a[1] + tuple_b[1])
    # tuple_a[0] is shipments in first tuple, tuple_b[0] is shipments in second tuple
    # tuple_a[1] is customers in first tuple, tuple_b[1] is customers in second tuple
)

# Sort the results by the key (zone, then weekday) for easier reading.
# `.sortByKey()` is a transformation. True means ascending order (default).
# Input RDD: ( ('Z1', 3), (2, 1) ), ( ('Z1', 2), (5, 2) ), ( ('Z2', 2), (10, 8) ), ...
# Output RDD (`sorted_counts_rdd`): ( ('Z1', 2), (5, 2) ), ( ('Z1', 3), (2, 1) ), ( ('Z2', 2), (10, 8) ), ...
sorted_counts_rdd = aggregated_counts_rdd.sortByKey(ascending=True)

# Collect the sorted results (action). Again, assume the final result is small.
final_counts_list = sorted_counts_rdd.collect()

# Print the collected results in a table format
if final_counts_list:
    print("Bendras siuntų ir aptarnautų klientų skaičius pagal zoną ir savaitės dieną:\n")
    print("Geogr. zona   Savaitės d.  Viso siuntų    Viso klientų")
    
    # Iterate through the sorted list and print each row
    for (zone, weekday), (shipments, customers) in final_counts_list:
        # Format each part to fit nicely in the columns
        # <: left-align, >: right-align, number: width
        print(f"{zone:<12}  {weekday:<11}  {shipments:<13}  {customers:<16} ")

   



# --- Stop Spark Session ---
# It's important to stop the SparkSession to release resources.
print("\nAnalysis complete. Stopping Spark session.")
spark.stop()
print("Spark session stopped.")


--- 3 užduotis: Siuntų ir klientų agregavimas pagal zoną ir savaitės dieną ---
Bendras siuntų ir aptarnautų klientų skaičius pagal zoną ir savaitės dieną:

Geogr. zona   Savaitės d.  Viso siuntų    Viso klientų
Z1            1            14092          6867             
Z1            2            21024          9979             
Z1            3            20494          10106            
Z1            4            16775          8110             
Z1            5            14653          7616             
Z1            6            302            127              
Z2            1            2910           1927             
Z2            2            4933           3045             
Z2            3            5335           3087             
Z2            4            3603           2234             
Z2            5            4073           2340             
Z2            6            23             11               
Z3            1            2896           1831             
Z3      