In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('DataFram').getOrCreate() 

In [3]:
spark

In [4]:
# Read DataSet

df = spark.read.option('header', 'true').csv('train.csv', inferSchema = True)
#show() para verlo
df.printSchema()
#dtypes

df.head(4)

root
 |-- id: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- levy: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- prod_year: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- leather_interior: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- engine_volume: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- cylinders: double (nullable = true)
 |-- gear_box_type: string (nullable = true)
 |-- drive_wheels: string (nullable = true)
 |-- doors: string (nullable = true)
 |-- wheel: string (nullable = true)
 |-- color: string (nullable = true)
 |-- airbags: integer (nullable = true)



[Row(id=45654403, price=13328, levy='1399', manufacturer='LEXUS', model='RX 450', prod_year=2010, category='Jeep', leather_interior='Yes', fuel_type='Hybrid', engine_volume='3.5', mileage='186005 km', cylinders=6.0, gear_box_type='Automatic', drive_wheels='4x4', doors='04-May', wheel='Left wheel', color='Silver', airbags=12),
 Row(id=44731507, price=16621, levy='1018', manufacturer='CHEVROLET', model='Equinox', prod_year=2011, category='Jeep', leather_interior='No', fuel_type='Petrol', engine_volume='3', mileage='192000 km', cylinders=6.0, gear_box_type='Tiptronic', drive_wheels='4x4', doors='04-May', wheel='Left wheel', color='Black', airbags=8),
 Row(id=45774419, price=8467, levy='-', manufacturer='HONDA', model='FIT', prod_year=2006, category='Hatchback', leather_interior='No', fuel_type='Petrol', engine_volume='1.3', mileage='200000 km', cylinders=4.0, gear_box_type='Variator', drive_wheels='Front', doors='04-May', wheel='Right-hand drive', color='Black', airbags=2),
 Row(id=457691

In [5]:
from pyspark.sql.functions import regexp_replace, col

# Change format of colum
df = df.withColumn(
    "mileage", 
    regexp_replace(col("mileage"), " km", "").cast("int")
)

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- levy: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- prod_year: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- leather_interior: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- engine_volume: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- cylinders: double (nullable = true)
 |-- gear_box_type: string (nullable = true)
 |-- drive_wheels: string (nullable = true)
 |-- doors: string (nullable = true)
 |-- wheel: string (nullable = true)
 |-- color: string (nullable = true)
 |-- airbags: integer (nullable = true)



In [6]:
from pyspark.ml.feature import StringIndexer

# Colums will be deleted
columns_to_drop = []


for column_name, column_type in df.dtypes:
    
    # Check is colum is string
    if column_type == "string":
        # New name indexed colum
        indexed_column_name = f"{column_name}I"
        
        # Create StringIndexer
        indexer = StringIndexer(inputCol=column_name, outputCol=indexed_column_name)
        
        # Apply StringIndexer to DataFrame
        df = indexer.fit(df).transform(df)
        
        # Add colum to list
        columns_to_drop.append(column_name)

# Delete old columns
df = df.drop(*columns_to_drop)


df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- prod_year: integer (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- cylinders: double (nullable = true)
 |-- airbags: integer (nullable = true)
 |-- levyI: double (nullable = false)
 |-- manufacturerI: double (nullable = false)
 |-- modelI: double (nullable = false)
 |-- categoryI: double (nullable = false)
 |-- leather_interiorI: double (nullable = false)
 |-- fuel_typeI: double (nullable = false)
 |-- engine_volumeI: double (nullable = false)
 |-- gear_box_typeI: double (nullable = false)
 |-- drive_wheelsI: double (nullable = false)
 |-- doorsI: double (nullable = false)
 |-- wheelI: double (nullable = false)
 |-- colorI: double (nullable = false)



In [7]:
# Moving price colum

cols = [col for col in df.columns if col != "price"] + ["price"]
df = df.select(*cols)

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- prod_year: integer (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- cylinders: double (nullable = true)
 |-- airbags: integer (nullable = true)
 |-- levyI: double (nullable = false)
 |-- manufacturerI: double (nullable = false)
 |-- modelI: double (nullable = false)
 |-- categoryI: double (nullable = false)
 |-- leather_interiorI: double (nullable = false)
 |-- fuel_typeI: double (nullable = false)
 |-- engine_volumeI: double (nullable = false)
 |-- gear_box_typeI: double (nullable = false)
 |-- drive_wheelsI: double (nullable = false)
 |-- doorsI: double (nullable = false)
 |-- wheelI: double (nullable = false)
 |-- colorI: double (nullable = false)
 |-- price: integer (nullable = true)



In [8]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array

# Join all columns in vector
assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
df_vector = assembler.transform(df)

# Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)


# Break the scaled vector into individual columns
df_normalized = df_scaled.withColumn("scaled_array", vector_to_array("scaled_features"))


# Create new columns for new normalized values
for idx, col_name in enumerate(df.columns):
    df_normalized = df_normalized.withColumn(f"{col_name}", col("scaled_array")[idx])

# Paso 4: Select only normalized colums
scaled_columns = [f"{col}" for col in df.columns]
df_final = df_normalized.select(*[col(c).alias(c) for c in scaled_columns])

# Mostrar resultado
df_final.head(3)

[Row(id=0.9935280230288475, prod_year=0.8765432098765431, mileage=8.661532778600945e-05, cylinders=0.3333333333333333, airbags=0.75, levyI=0.02867383512544803, manufacturerI=0.09375, modelI=0.013845185651353053, categoryI=0.1, leather_interiorI=0.0, fuel_typeI=0.3333333333333333, engine_volumeI=0.04716981132075471, gear_box_typeI=0.0, drive_wheelsI=0.5, doorsI=0.0, wheelI=0.0, colorI=0.13333333333333333, price=0.0005065855937122719),
 Row(id=0.956714926907598, prod_year=0.8888888888888888, mileage=8.94069672047193e-05, cylinders=0.3333333333333333, airbags=0.5, levyI=0.04659498207885305, manufacturerI=0.0625, modelI=0.07048458149779736, categoryI=0.1, leather_interiorI=1.0, fuel_typeI=0.0, engine_volumeI=0.0660377358490566, gear_box_typeI=0.3333333333333333, drive_wheelsI=0.5, doorsI=0.0, wheelI=0.0, colorI=0.0, price=0.0006317590281006948),
 Row(id=0.9983153019249396, prod_year=0.8271604938271604, mileage=9.313225750491594e-05, cylinders=0.2, airbags=0.125, levyI=0.0, manufacturerI=0.

In [9]:
# Eliminar la columna "id"
df= df_final.drop("id")
df.printSchema()

root
 |-- prod_year: double (nullable = true)
 |-- mileage: double (nullable = true)
 |-- cylinders: double (nullable = true)
 |-- airbags: double (nullable = true)
 |-- levyI: double (nullable = true)
 |-- manufacturerI: double (nullable = true)
 |-- modelI: double (nullable = true)
 |-- categoryI: double (nullable = true)
 |-- leather_interiorI: double (nullable = true)
 |-- fuel_typeI: double (nullable = true)
 |-- engine_volumeI: double (nullable = true)
 |-- gear_box_typeI: double (nullable = true)
 |-- drive_wheelsI: double (nullable = true)
 |-- doorsI: double (nullable = true)
 |-- wheelI: double (nullable = true)
 |-- colorI: double (nullable = true)
 |-- price: double (nullable = true)



In [25]:
# Slpit de dataset in train and test
train_df, test_df = df.randomSplit([0.5, 0.5], seed=1)

print(train_df.count())

print(test_df.count())



9597
9640


In [24]:
train_df.write.csv("train_updated.csv",header = True)
test_df.write.csv("test_updated.csv",header = True)

#Problem I think I need HADOOP_HOME
#https://github.com/steveloughran/winutils

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/Fernando/Universidad/Asignaturas Montenegro/ISS-AI/src/Spark/train_updated.csv already exists. Set mode as "overwrite" to overwrite the existing path.