In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, to_date, when, round, first
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
import os

# Aplinkos paruošimas
os.environ["PYSPARK_PYTHON"] = r"C:\Users\Eivinas\DDA\ld2\venv\Scripts\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\Eivinas\DDA\ld2\venv\Scripts\python.exe"

spark = SparkSession.builder \
    .appName("DDA LD3") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.local.dir", "C:/tmp/spark-temp") \
    .getOrCreate()

sc = spark.sparkContext

# Patikriname ar veikia
spark

In [3]:
# Nuskaitome duomenis
df = spark.read.option("header", "true").option("inferSchema", "true").csv("StructuredData.txt")

In [4]:
# Pirmas uždavinys
df = df.withColumn("kaina procentas", col("kaina procentas").cast("double")) \
       .withColumn("kaina vienetais", col("kaina vienetais").cast("double")) \
       .withColumn("siuntu skaicius", col("siuntu skaicius").cast("int")) \
       .withColumn("svoris", col("svoris").cast("double")) \
       .withColumn("sustojimo data", to_date(col("sustojimo data"), "yyyy-MM-dd"))


In [5]:
aggregated = df.groupBy("marsrutas", "sustojimo data").agg(
    _sum("siuntu skaicius").alias("bendras_siuntu_skaicius"),
    _sum("svoris").alias("bendras_svoris"),
    _sum("kaina procentas").alias("kaina_procentas_total"),
    _sum("kaina vienetais").alias("kaina_vienetais_total")
)

# Suskaičiuoti pilną kainą tada ir tik tada, kai kaina procentas ir kaina vienetais yra didesni už 0
aggregated = aggregated.withColumn(
    "pilna_kaina",
    when((col("kaina_procentas_total") > 0) & (col("kaina_vienetais_total") > 0),
         col("kaina_vienetais_total") / col("kaina_procentas_total"))
    .otherwise(0)
)


In [6]:
# Apskaičiuoti bendrąsias siuntų, svorio ir kainos sumas pagal maršrutą
route_totals = aggregated.groupBy("marsrutas").agg(
    _sum("bendras_siuntu_skaicius").alias("total_siuntos"),
    _sum("bendras_svoris").alias("total_svoris"),
    round(_sum("pilna_kaina"), 2).alias("total_kaina")
)


In [7]:
top_by_kaina = route_totals.orderBy(col("total_kaina").desc()).limit(3)
top_by_siuntos = route_totals.orderBy(col("total_siuntos").desc()).limit(3)
top_by_svoris = route_totals.orderBy(col("total_svoris").desc()).limit(3)

print("Top 3 maršrutai pagal pilną kainą:")
top_by_kaina.show()

print("Top 3 maršrutai pagal siuntų skaičių:")
top_by_siuntos.show()

print("Top 3 maršrutai pagal bendrą svorį:")
top_by_svoris.show()


Top 3 maršrutai pagal pilną kainą:
+---------+-------------+------------------+-----------+
|marsrutas|total_siuntos|      total_svoris|total_kaina|
+---------+-------------+------------------+-----------+
|       94|          663|        216402.237|    1752.48|
|       69|         1881|135239.58100000003|    1662.88|
|      814|         1075|23038.590000000004|    1586.58|
+---------+-------------+------------------+-----------+

Top 3 maršrutai pagal siuntų skaičių:
+---------+-------------+------------------+-----------+
|marsrutas|total_siuntos|      total_svoris|total_kaina|
+---------+-------------+------------------+-----------+
|      121|         7842|31708.965000000007|    1018.71|
|      105|         7171|20034.024999999998|      868.1|
|      232|         6938|          31679.47|    1102.17|
+---------+-------------+------------------+-----------+

Top 3 maršrutai pagal bendrą svorį:
+---------+-------------+------------------+-----------+
|marsrutas|total_siuntos|      tot

In [8]:
# Antras uždavinys
clean_df = aggregated.filter(
    (col("bendras_siuntu_skaicius") > 0) &
    (col("bendras_svoris") > 0) &
    (col("pilna_kaina") > 0)
)

In [9]:
assembler = VectorAssembler(
    inputCols=["bendras_siuntu_skaicius", "bendras_svoris"],
    outputCol="features"
)

assembled = assembler.transform(clean_df).select("features", "pilna_kaina")

In [10]:
lr = LinearRegression(featuresCol="features", labelCol="pilna_kaina")
lr_model = lr.fit(assembled)
results = lr_model.evaluate(assembled)
print(f"Koeficientai: {lr_model.coefficients}")
print(f"beta0: {lr_model.intercept}")
print(f"R^2: {results.r2:.4f}")
print(f"RMSE: {results.rootMeanSquaredError:.4f}")

Koeficientai: [0.055931357320565,0.004087852799717063]
beta0: 25.801254176885177
R^2: 0.2590
RMSE: 12.9059


In [11]:
# Trečias uždavinys
geo = df.select("marsrutas", "sustojimo data", "geografine zona") \
        .groupBy("marsrutas", "sustojimo data") \
        .agg(first("geografine zona").alias("geografine_zona"))

extended = aggregated.join(geo, on=["marsrutas", "sustojimo data"])

In [12]:
indexer = StringIndexer(inputCol="geografine_zona", outputCol="geo_index")
encoder = OneHotEncoder(inputCols=["geo_index"], outputCols=["geo_ohe"])

indexed = indexer.fit(extended).transform(extended)
encoded = encoder.fit(indexed).transform(indexed)

In [13]:
assembler2 = VectorAssembler(
    inputCols=["bendras_siuntu_skaicius", "bendras_svoris", "geo_ohe"],
    outputCol="features"
)
assembled2 = assembler2.transform(encoded).select("features", "pilna_kaina")


In [14]:
lr2 = LinearRegression(featuresCol="features", labelCol="pilna_kaina")
model2 = lr2.fit(assembled2)
results2 = model2.evaluate(assembled2)

print(f"Koeficientai: {model2.coefficients}")
print(f"beta0: {model2.intercept}")
print(f"R^2: {results2.r2:.4f}")
print(f"RMSE: {results2.rootMeanSquaredError:.4f}")

Koeficientai: [0.07495242839902258,0.004573756213904592,-10.312014393201618,-0.024153649450281]
beta0: 29.463780323795586
R^2: 0.3620
RMSE: 12.0365


In [15]:
indexer = StringIndexer(inputCol="geografine zona", outputCol="geo_index")
model = indexer.fit(df)
print(model.labels)

['Z1', 'Z3', 'Z2']
