In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, sum, first, count, desc, to_date
from pyspark.sql.types import DoubleType, IntegerType, DateType




spark = SparkSession.builder \
    .appName("LogistikosAnalize") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

data_path = 'StructuredData.txt'

df = spark.read.csv(data_path, header=True, inferSchema=True)

# Duomenų tipų konvertavimas ir paruošimas
# Svarbu: Reikia užtikrinti, kad 'kaina procentas' nebūtų 0 arba NULL, kad išvengti klaidų dalinant iš nulio.
df_prepared = df.withColumn("sustojimo data", to_date(col("sustojimo data"), "yyyy-MM-dd")) \
                .withColumn("siuntu skaicius", col("siuntu skaicius").cast(IntegerType())) \
                .withColumn("svoris", col("svoris").cast(DoubleType())) \
                .withColumn("kaina procentas", col("kaina procentas").cast(DoubleType())) \
                .withColumn("kaina vienetais", col("kaina vienetais").cast(DoubleType()))

# Filtruojame eilutes, kur 'kaina procentas' yra netinkama (<= 0) arba NULL
df_filtered = df_prepared.filter((col("kaina procentas").isNotNull()) & (col("kaina procentas") > 0))


# Agreguojame duomenis pagal maršrutą ir dieną
daily_aggregated_df = df_filtered.groupBy("marsrutas", "sustojimo data") \
    .agg(
        sum("siuntu skaicius").alias("dienos_siuntu_skaicius"),
        sum("svoris").alias("dienos_svoris"),
        (sum("kaina vienetais")/sum("kaina procentas")).alias("pilna_dienos_kaina")
    )


print("Agreguota lentelė pagal maršrutą ir dieną:")
daily_aggregated_df.sort("marsrutas") \
    .select(
        col("marsrutas"),
        col("sustojimo data"),
        col("dienos_siuntu_skaicius"),
        round(col("dienos_svoris"), 2).alias("dienos_svoris"),
        round(col("pilna_dienos_kaina"),2).alias("pilna_dienos_kaina"),
     ) \
    .show(10)



# TOP 3 pagal didžiausią kainą
top3_kaina = daily_aggregated_df.orderBy(desc("pilna_dienos_kaina")).limit(3)
print("\nTOP 3 maršrutai pagal didžiausią bendrą kainą:")
top3_kaina.select(
    col("marsrutas"),
    col("dienos_siuntu_skaicius"),
    round(col("dienos_svoris"), 2).alias("visas_svoris"),
    round(col("pilna_dienos_kaina"), 2).alias("visa_kaina")
).show(truncate=False)

# TOP 3 pagal didžiausią siuntų skaičių
top3_siuntos = daily_aggregated_df.orderBy(desc("dienos_siuntu_skaicius")).limit(3)
print("\nTOP 3 maršrutai pagal didžiausią siuntų skaičių:")
top3_siuntos.select(
    col("marsrutas"),
    col("dienos_siuntu_skaicius"),
    round(col("dienos_svoris"), 2).alias("visas_svoris"),
    round(col("pilna_dienos_kaina"), 2).alias("visa_kaina")
).show(truncate=False)

# TOP 3 pagal didžiausią svorį
top3_svoris = daily_aggregated_df.orderBy(desc("dienos_svoris")).limit(3)
print("\nTOP 3 maršrutai pagal didžiausią bendrą svorį:")
top3_svoris.select(
    col("marsrutas"),
    col("dienos_siuntu_skaicius"),
    round(col("dienos_svoris"), 2).alias("visas_svoris"),
    round(col("pilna_dienos_kaina"), 2).alias("visa_kaina")
).show(truncate=False)

Agreguota lentelė pagal maršrutą ir dieną:
+---------+--------------+----------------------+-------------+------------------+
|marsrutas|sustojimo data|dienos_siuntu_skaicius|dienos_svoris|pilna_dienos_kaina|
+---------+--------------+----------------------+-------------+------------------+
|       32|    2018-01-18|                   120|        318.2|             25.57|
|       32|    2018-01-05|                   140|       377.95|             24.07|
|       32|    2018-01-16|                   156|       308.35|             26.58|
|       32|    2018-01-22|                   125|        340.9|             21.33|
|       32|    2018-01-17|                   159|       474.95|             28.79|
|       32|    2018-01-30|                   140|       354.75|             27.54|
|       32|    2018-01-09|                   161|       471.15|             25.05|
|       32|    2018-01-03|                   272|        637.9|             33.47|
|       32|    2018-01-26|                  

In [6]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col, abs
import numpy as np # Reikalingas koeficientų absoliučioms reikšmėms gauti


# Užtikriname, kad nėra NULL reikšmių ir kaina yra teigiama
model_data = daily_aggregated_df.filter(
    col("dienos_siuntu_skaicius").isNotNull() &
    col("dienos_svoris").isNotNull() &
    col("pilna_dienos_kaina").isNotNull() &
    (col("pilna_dienos_kaina") > 0) # Papildomas patikrinimas stabilumui
)

# 1. Požymių paruošimas - sujungimas į vektorių
feature_cols = ["dienos_siuntu_skaicius", "dienos_svoris"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
data_assembled = assembler.transform(model_data)

# 2. Požymių standartizavimas (būtina teisingam svarbos palyginimui)
# Standartizavimas daro požymių vidurkį 0 ir standartinį nuokrypį 1
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features", withStd=True, withMean=True)
scaler_model = scaler.fit(data_assembled)
data_scaled = scaler_model.transform(data_assembled)

# Pasirenkame tik modeliui reikalingus stulpelius
final_data = data_scaled.select("features", col("pilna_dienos_kaina").alias("label"))

# 3. Tiesinės regresijos modelio sukūrimas ir apmokymas (visais turimais duomenimis)
lr = LinearRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(final_data)

# 4. Rezultatų spausdinimas (tik formulė ir svarba)

# Gauname koeficientus ir laisvąjį narį
coefficients = lr_model.coefficients.toArray() # Konvertuojame į NumPy masyvą
intercept = lr_model.intercept


formula = f"std_pilna_dienos_kaina = {intercept:.4f}"
formula += f" + ({coefficients[0]:.4f} * std_dienos_siuntu_skaicius)"
formula += f" + ({coefficients[1]:.4f} * std_dienos_svoris)"

print("Tiesinės regresijos rezultatai\n")
print(formula)

print(f"Number of training instances: {lr_model.summary.numInstances}")
print(f"R^2: {lr_model.summary.r2:.4f}")
print(f"Vidutinė kvadratinė paklaida (RMSE): {lr_model.summary.rootMeanSquaredError:.4f}")

# 5. Faktorių svarbumo nustatymas

print(f"\nFaktorius, darantis didžiausią įtaką kainai yra: dienos svoris su {coefficients[1]:.4f} reikšme")

Tiesinės regresijos rezultatai

std_pilna_dienos_kaina = 36.9328 + (3.7556 * std_dienos_siuntu_skaicius) + (6.7541 * std_dienos_svoris)
Number of training instances: 7908
R^2: 0.2589
Vidutinė kvadratinė paklaida (RMSE): 12.9066

Faktorius, darantis did=iausi1 įtaką kainai yra: dienos svoris su 6.7541 reikšme


In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.regression import GBTRegressor
from pyspark.sql.functions import col, sum, count, expr
from pyspark.sql import functions as F

# 1. Sukuriame papildomą požymį: vidutinė kaina už siuntą (galimas stiprus prognozuojantis faktorius)
daily_aggregated_df_improved = df_filtered.groupBy("marsrutas", "sustojimo data") \
    .agg(
        sum("siuntu skaicius").alias("dienos_siuntu_skaicius"),
        sum("svoris").alias("dienos_svoris"),
        (sum("kaina vienetais") / sum("kaina procentas")).alias("pilna_dienos_kaina"),
        count("*").alias("dienos_sustojimu_skaicius")
    ) \
    .withColumn("vid_kaina_uz_siunta", F.when(col("dienos_siuntu_skaicius") > 0, col("pilna_dienos_kaina") / col("dienos_siuntu_skaicius")).otherwise(0.0)) \
    .withColumn("vid_svoris_uz_siunta", F.when(col("dienos_siuntu_skaicius") > 0, col("dienos_svoris") / col("dienos_siuntu_skaicius")).otherwise(0.0))


# 2. IQR filtravimas (pašaliname išskirtis pagal pilna_dienos_kaina)
quantiles = daily_aggregated_df_improved.approxQuantile("pilna_dienos_kaina", [0.25, 0.75], 0.01)
if quantiles and len(quantiles) == 2:
    Q1, Q3 = quantiles
    IQR_val = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR_val
    upper_bound = Q3 + 1.5 * IQR_val
    data_no_outliers = daily_aggregated_df_improved.filter(
        (col("pilna_dienos_kaina") >= lower_bound) & (col("pilna_dienos_kaina") <= upper_bound)
    )
else:
    data_no_outliers = daily_aggregated_df_improved

# 3. Null ir neigiamų reikšmių šalinimas
model_data_ready = data_no_outliers.filter(
    (col("dienos_siuntu_skaicius").isNotNull()) &
    (col("dienos_svoris").isNotNull()) &
    (col("pilna_dienos_kaina") > 0) &
    (col("dienos_sustojimu_skaicius").isNotNull()) &
    (col("vid_kaina_uz_siunta").isNotNull()) &
    (col("vid_svoris_uz_siunta").isNotNull())
)
print(f"Duomenų eilučių skaičius po filtravimo: {model_data_ready.count()}")


# 4. Požymių paruošimas (įtraukiame ir naujus požymius)
feature_cols = [
    "dienos_siuntu_skaicius",
    "dienos_svoris",
    "dienos_sustojimu_skaicius",
    "vid_kaina_uz_siunta",
    "vid_svoris_uz_siunta"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="features", withStd=True, withMean=True)

pipeline_preprocessing = Pipeline(stages=[assembler, scaler])
pipeline_model = pipeline_preprocessing.fit(model_data_ready)
final_data_improved = pipeline_model.transform(model_data_ready).select("features", col("pilna_dienos_kaina").alias("label"))


# 5. Naudojame Ridge regresiją (t.y. L2 reguliarizaciją), kad modelis būtų stabilesnis
lr_improved = LinearRegression(featuresCol="features", labelCol="label", regParam=0.1, elasticNetParam=0.0)
lr_model_improved = lr_improved.fit(final_data_improved)

# 6. Modelio rezultatų spausdinimas
print("\n\n--- PATOBULINTAS MODELIS SU IŠVESTINIAIS POŽYMIAIS ---")
coefficients_imp = lr_model_improved.coefficients.toArray()
intercept_imp = lr_model_improved.intercept
summary_imp = lr_model_improved.summary

# Formulė
formula_imp = f"pilna_dienos_kaina = {intercept_imp:.4f}"
for i, feat in enumerate(feature_cols):
    formula_imp += f" + ({coefficients_imp[i]:.4f} * std_{feat})"

print(formula_imp)
print(f"\nNaudotų požymių kiekis: {len(feature_cols)}")
print(f"R^2: {summary_imp.r2:.4f}")
print(f"RMSE: {summary_imp.rootMeanSquaredError:.4f}")
print(f"Training Instances: {summary_imp.numInstances}")


Duomenų eilučių skaičius po filtravimo: 7564


--- PATOBULINTAS MODELIS SU IŠVESTINIAIS POŽYMIAIS ---
pilna_dienos_kaina = 35.4793 + (0.4154 * std_dienos_siuntu_skaicius) + (8.1422 * std_dienos_svoris) + (7.1081 * std_dienos_sustojimu_skaicius) + (5.6979 * std_vid_kaina_uz_siunta) + (-3.4147 * std_vid_svoris_uz_siunta)

Naudotų požymių kiekis: 5
R^2: 0.4931
RMSE: 8.1631
Training Instances: 7564
