# 1. Integrácia dát (3b)

- Integrácia datasetu - vhodne zakomponujte zvolené informácie o počasí.
- Sampling – vytvorenie vzorky z datasetu (veľkosti napr. 10%) pri zachovaní rozloženia cieľového atribútu.
- Rozdelenie datasetu na trénovaciu a testovaciu množinu (napr. v pomere 60/40).

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
def read_csv_files(spark: SparkSession, file_paths: list[str], infer_schema:bool=True, header:bool=True, on:str="id", how:str="inner") -> DataFrame:
    if len(file_paths) == 0:
        schema = StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True)
        ])
        df = spark.createDataFrame([],schema)
    else:
        df = spark.read.csv(file_paths[0], header=header, inferSchema=infer_schema)
        for idx in range(1,len(file_paths)):
            file = spark.read.csv(file_paths[idx], header=header, inferSchema=infer_schema)
            file = file.withColumnRenamed("Vehicle_Reference", f"Vehicle_Reference_{idx}")
            df = df.join(file, on=on, how=how)
            print(df.columns)
            print(f'df{idx}--------------')
    return df

In [3]:
def sample_by_percent(df: DataFrame, label: str, percent: float) -> DataFrame:
    fractions = df.select(label).distinct().rdd.map(lambda r: (r[0], percent)).collectAsMap()
    return df.stat.sampleBy(label, fractions, seed=42)

In [4]:
spark = SparkSession.builder.appName("CarAccidents").getOrCreate()

file_list = [
    "../datalab/TSVD/dataset/CarAccidents/Accidents.csv",
    "../datalab/TSVD/dataset/CarAccidents/Casualties.csv",
    "../datalab/TSVD/dataset/CarAccidents/Vehicles.csv"
]

df = read_csv_files(spark, file_list, on="Accident_Index",how="inner")

['Accident_Index', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', 'Did_Police_Officer_Attend_Scene_of_Accident', 'LSOA_of_Accident_Location', 'Vehicle_Reference_1', 'Casualty_Reference', 'Casualty_Class', 'Sex_of_Casualty', 'Age_of_Casualty', 'Age_Band_of_Casualty', 'Casualty_Severity', 'Pedestrian_Location', 'Pedestrian_Movement', 'Car_Passenger', 'Bus_or_Coach_Passenger', 'Pedestrian_Road_Maintenance_Worker', 'Casualty_Type', 

In [14]:

sampled_data = sample_by_percent(df,"Accident_Severity",0.1)
print(f"Percent for sample data: {(sampled_data.count()/df.count())*100}")
# sampled_data

DataFrame[Accident_Index: string, Location_Easting_OSGR: int, Location_Northing_OSGR: int, Longitude: double, Latitude: double, Police_Force: int, Accident_Severity: int, Number_of_Vehicles: int, Number_of_Casualties: int, Date: string, Day_of_Week: int, Time: timestamp, Local_Authority_(District): int, Local_Authority_(Highway): string, 1st_Road_Class: int, 1st_Road_Number: int, Road_Type: int, Speed_limit: int, Junction_Detail: int, Junction_Control: int, 2nd_Road_Class: int, 2nd_Road_Number: int, Pedestrian_Crossing-Human_Control: int, Pedestrian_Crossing-Physical_Facilities: int, Light_Conditions: int, Weather_Conditions: int, Road_Surface_Conditions: int, Special_Conditions_at_Site: int, Carriageway_Hazards: int, Urban_or_Rural_Area: int, Did_Police_Officer_Attend_Scene_of_Accident: int, LSOA_of_Accident_Location: string, Vehicle_Reference_1: int, Casualty_Reference: int, Casualty_Class: int, Sex_of_Casualty: int, Age_of_Casualty: int, Age_Band_of_Casualty: int, Casualty_Severity:

In [22]:
# train_data, test_data = df.randomSplit([0.6, 0.4], seed=42)
from pyspark.sql import functions as F

print(sampled_data.columns)
from pyspark.sql.functions import approx_count_distinct

agg_result = sampled_data.agg(*[
    F.approx_count_distinct(c).alias(c)
    for c in sampled_data.columns
]).first().asDict()

# print("Уникальные значения по столбцам:")
for col, count in agg_result.items():
    if count > 9999:
        sampled_data = sampled_data.drop(col)
        print(f"{col}: {count} dropped")
    else:
        print(f"{col}: {count}")


['Accident_Index', 'Location_Easting_OSGR', 'Location_Northing_OSGR', 'Longitude', 'Latitude', 'Police_Force', 'Accident_Severity', 'Number_of_Vehicles', 'Number_of_Casualties', 'Date', 'Day_of_Week', 'Time', 'Local_Authority_(District)', 'Local_Authority_(Highway)', '1st_Road_Class', '1st_Road_Number', 'Road_Type', 'Speed_limit', 'Junction_Detail', 'Junction_Control', '2nd_Road_Class', '2nd_Road_Number', 'Pedestrian_Crossing-Human_Control', 'Pedestrian_Crossing-Physical_Facilities', 'Light_Conditions', 'Weather_Conditions', 'Road_Surface_Conditions', 'Special_Conditions_at_Site', 'Carriageway_Hazards', 'Urban_or_Rural_Area', 'Did_Police_Officer_Attend_Scene_of_Accident', 'LSOA_of_Accident_Location', 'Vehicle_Reference_1', 'Casualty_Reference', 'Casualty_Class', 'Sex_of_Casualty', 'Age_of_Casualty', 'Age_Band_of_Casualty', 'Casualty_Severity', 'Pedestrian_Location', 'Pedestrian_Movement', 'Car_Passenger', 'Bus_or_Coach_Passenger', 'Pedestrian_Road_Maintenance_Worker', 'Casualty_Type', 

In [23]:
# df_cleaned = df.dropna()

sd = sampled_data.drop('Date')
# sd = sd.drop('Accident_Index')
# sd = sd.drop('Local_Authority_(Highway)')
# sd = sd.drop('LSOA_of_Accident_Location')

for col_name in sd.columns:
    col_type = sd.schema[col_name].dataType
    if isinstance(col_type, StringType):
        print(f'{col_name}: {col_type}')
# sd.printSchema()

Local_Authority_(Highway): StringType()


# 2. Predspracovanie (7b)

- Transformácia nominálnych atribútov na numerické
- Transformácia numerických atribútov na nominálne
- Vypočítanie pomerového kritéria – informačného zisku voči cieľovému atribútu (klasifikačná úloha), pre nominálne atribúty
- Vypočítanie štatistík pre numerické atribúty
- Vytvorenie histogramov pre nominálne atribúty
- Spracovanie chýbajúcich hodnôt (napr. ich nahradenie priemermi, atď.)

In [122]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, count, mean, when, isnull, isnan, lit
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer, Imputer
from pyspark.mllib.stat import Statistics
import matplotlib.pyplot as plt
import pandas as pd

# def read_csv_files(spark: SparkSession, file_paths: list[str], infer_schema:bool=True, header:bool=True, on:str="id", how:str="inner") -> DataFrame:
#     if len(file_paths) == 0:
#         schema = StructType([
#             StructField("id", IntegerType(), True),
#             StructField("name", StringType(), True)
#         ])
#         df = spark.createDataFrame([],schema)
#     else:
#         df = spark.read.csv(file_paths[0], header=header, inferSchema=infer_schema)
#         for idx in range(1,len(file_paths)):
#             file = spark.read.csv(file_paths[idx], header=header, inferSchema=infer_schema)
#             file = file.withColumnRenamed("Vehicle_Reference", f"Vehicle_Reference_{idx}")
#             df = df.join(file, on=on, how=how)
#     return df

# def sample_by_percent(df: DataFrame, label: str, percent: float) -> DataFrame:
#     fractions = df.select(label).distinct().rdd.map(lambda r: (r[0], percent)).collectAsMap()
#     return df.stat.sampleBy(label, fractions, seed=42)

def preprocess_data(df: DataFrame) -> DataFrame:
    """Main preprocessing function implementing all requested tasks"""
    
    # 1. Identify column types
    categorical_cols = [field.name for field in df.schema.fields 
                      if isinstance(field.dataType, StringType)]
    numerical_cols = [field.name for field in df.schema.fields 
                     if isinstance(field.dataType, (IntegerType, FloatType, DoubleType))]
    numerical_cols.remove('Accident_Severity')
    
    # 2. Transform nominal attributes to numerical
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") 
               for col in categorical_cols]
    
    pipeline_stages = indexers
    
    # 3. Transform numerical attributes to nominal (binning)
    for num_col in numerical_cols:
        if num_col != "Accident_Index":  # Skip ID columns
            discretizer = QuantileDiscretizer(numBuckets=5, 
                                            inputCol=num_col, 
                                            outputCol=num_col+"_category",
                                            handleInvalid="keep",
                                            relativeError=0.01)
            pipeline_stages.append(discretizer)
    
    # 4. Information Gain calculation will be done separately
    
    # Cache the input DataFrame if used multiple times
    df.cache()  # or df.persist() with specific storage level
    pipeline = Pipeline(stages=pipeline_stages)
    pipeline_model = pipeline.fit(df)  # Fit once
    preprocessed_df = pipeline_model.transform(df)  # Transform
    
    # Unpersist when done
    df.unpersist()
    
    return preprocessed_df

def calculate_information_gain(df: DataFrame, target_col: str = "Accident_Severity"):
    """Calculate information gain for nominal attributes against target"""
    from pyspark.ml.feature import ChiSqSelector
    
    # Get all indexed categorical columns
    categorical_index_cols = [col for col in df.columns if col.endswith("_index")]
    binned_cols = [col for col in df.columns if col.endswith('_category')]
    categorical_index_cols = categorical_index_cols + binned_cols
    # print(categorical_index_cols)
    
    # Create feature vector
    assembler = VectorAssembler(
        inputCols=categorical_index_cols,
        outputCol="features")
    
    feature_df = assembler.transform(df)
    # print('feature_df cols')
    # feature_df.printSchema()
    
    # Calculate chi-squared stats (related to information gain)
    selector = ChiSqSelector(numTopFeatures=10, 
                           featuresCol="features", 
                           outputCol="selectedFeatures", 
                           labelCol=target_col)
    
    # Get and display the chi-squared test results
    model = selector.fit(feature_df)
    result = model.transform(feature_df)

    # TODO CAN BE USEFUL
    best_cols_indices = model.selectedFeatures
    from pyspark.ml.stat import ChiSquareTest
    
    # List of all categorical columns (original + binned)
    all_categorical_cols = [col for idx, col in enumerate(categorical_index_cols) if idx in best_cols_indices]  
    
    # Compute information gain (via chi-squared) for each column
    ig_results = []
    for col in all_categorical_cols:
         # Create a single-feature vector for each column
        assembler = VectorAssembler(inputCols=[col], outputCol="feature_vec")
        df_vec = assembler.transform(df)
        
        # Run ChiSquareTest on the vectorized feature
        chi2 = ChiSquareTest.test(df_vec, "feature_vec", target_col).head()
        
        ig_results.append({
            "Feature": str(col),
            "Chi2_Statistic": float(chi2.statistics.toArray()[0]),
            "pValue": float(chi2.pValues[0]),
            "DegreesOfFreedom": chi2.degreesOfFreedom,
            "Information_Gain": float(chi2.statistics.toArray()[0]) / (2 * df_vec.count())
        })
    
    # Convert to DataFrame and sort by IG
    ig_df = spark.createDataFrame(ig_results).orderBy("information_gain", ascending=False)
    ig_df.show(truncate=False)
    
    return model, result

def calculate_numerical_stats(df: DataFrame):
    """Calculate statistics for numerical attributes"""
    numerical_cols = [col for col in df.columns 
                     if any(col.endswith(suffix) for suffix in ["", "_imputed"]) 
                     and not col.endswith(("_index", "_category"))]
    
    # Basic statistics
    stats = df.select(numerical_cols).describe().toPandas().set_index('summary')
    print("Basic statistics:")
    print(stats)
    
    # Correlation matrix
    numerical_df = df.select(numerical_cols)
    corr_matrix = Statistics.corr(numerical_df.rdd.map(lambda row: [float(x) for x in row]), method="pearson")
    corr_df = pd.DataFrame(corr_matrix, columns=numerical_cols, index=numerical_cols)
    print("\nCorrelation matrix:")
    print(corr_df)
    
    return stats, corr_df

def create_histograms(df: DataFrame, nominal_cols: list):
    """Create histograms for nominal attributes"""
    for col_name in nominal_cols:
        if col_name in df.columns:
            # Get value counts
            value_counts = df.groupBy(col_name).count().orderBy("count", ascending=False).toPandas()
            
            # Plot histogram
            plt.figure(figsize=(10, 6))
            plt.bar(value_counts[col_name].astype(str), value_counts["count"])
            plt.title(f"Histogram of {col_name}")
            plt.xlabel(col_name)
            plt.ylabel("Count")
            plt.xticks(rotation=45)
            plt.show()

In [117]:
# testing ground for showing ig
ig_cols = []
for col in sd.columns:
    col_type = preprocessed_df.schema[col].dataType
    if isinstance(col_type, StringType):
        ig_cols.append(col)
        print(f'{col}: {col_type}')

target_col = 'Accident_Severity'
show_ig_by_col(sd, ig_cols, target_col)

Local_Authority_(Highway): StringType()


IllegalArgumentException: Data type string of column Local_Authority_(Highway) is not supported.

In [65]:
# 3. Preprocess data
%matplotlib inline
# print(sd.columns)
preprocessed_df = preprocess_data(sd)
print('preprocessing done')

preprocessing done


In [52]:
# cols = ["age_index", "Age_Index", "Accident_Index", "income_index"]
# [index_col for index_col in cols if index_col.endswith("_index")]
# preprocessed_df.printSchema()
for col in preprocessed_df.columns:
    if 'category' in col:
        col_type = preprocessed_df.schema[col].dataType
        # print(col)
        # print(col_type)
# [index_col for index_col in cols if index_col.endswith("_index")]

In [53]:
# preprocessed_df.columns

# Split data
# train_data, test_data = preprocessed_df.randomSplit([0.6, 0.4], seed=42)
# print('splitting done')

In [123]:
# 4. Calculate information gain
print("\nInformation Gain Analysis:")
ig_model, results = calculate_information_gain(preprocessed_df)


Information Gain Analysis:
+------------------+----------------+-----------------------------------+---------------------+------+
|Chi2_Statistic    |DegreesOfFreedom|Feature                            |Information_Gain     |pValue|
+------------------+----------------+-----------------------------------+---------------------+------+
|10634.363599288286|[6]             |Speed_limit_category               |0.012387346560442786 |0.0   |
|10373.552110504139|[4]             |Number_of_Casualties_category      |0.01208354255107729  |0.0   |
|9717.734724935726 |[410]           |Local_Authority_(Highway)_index    |0.011319619335592807 |0.0   |
|4496.3925122187475|[4]             |Number_of_Vehicles_category        |0.005237583970173943 |0.0   |
|1707.5663716821252|[4]             |Road_Type_category                 |0.0019890439351161523|0.0   |
|1607.2833538624996|[6]             |1st_Road_Class_category            |0.0018722301282286485|0.0   |
|1177.2397018903887|[8]             |Police_F

In [70]:
results.select('selectedFeatures').take(5)

[Row(selectedFeatures=DenseVector([109.0, 0.0, 1.0, 1.0, 2.0, 0.0, 2.0, 2.0, 2.0, 1.0])),
 Row(selectedFeatures=DenseVector([109.0, 0.0, 1.0, 1.0, 3.0, 0.0, 1.0, 1.0, 2.0, 1.0])),
 Row(selectedFeatures=DenseVector([109.0, 0.0, 0.0, 1.0, 2.0, 0.0, 2.0, 3.0, 2.0, 1.0])),
 Row(selectedFeatures=DenseVector([109.0, 0.0, 2.0, 2.0, 2.0, 0.0, 1.0, 4.0, 2.0, 1.0])),
 Row(selectedFeatures=DenseVector([109.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 4.0, 2.0, 1.0]))]

In [None]:
# 5. Calculate numerical statistics
# last error here
print("\nNumerical Statistics:")
stats, corr_matrix = calculate_numerical_stats(preprocessed_df)

In [40]:

# 6. Create histograms
print("\nCreating histograms...")
categorical_cols = [col for col in preprocessed_df.columns 
                   if isinstance(preprocessed_df.schema[col].dataType, StringType)]
create_histograms(preprocessed_df, categorical_cols[:5])  # First 5 to avoid too many plots

spark.stop()

# 3. Modelovanie - Vytvorenie popisných modelov (3b):

- Vytvorte k-means clustering model
- Pomocou vytvoreného modelu detekujte anomálie
# 4. Modelovanie - Vytvorenie klasifikačných modelov typu (aspoň jeden model každého typu)(4b):

- Decision tree model
- Linear SVM
- Naive Bayes model
- Ensembles of decision trees (Random Forests, Gradient-boosted trees)

# 5. Vyhodnotenie (3b)

- Natrénovanie klasifikačného modelu na trénovacej množine a jeho evaluáciu na testovacej množine.
- Klasifikačný model vyhodnocujte použitím kontigenčnej tabuľky a vypočítaním metrík presnosti, návratnosti, F1 a MCC (Matthews Correlation Coefficient).