In [1]:
!pip install pyspark



In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

SESION DE SPARK

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

session = SparkSession \
          .builder \
          .appName("Demo Spark") \
          .getOrCreate()

sc = session.sparkContext

CARGA DE DATOS

In [4]:
df_cars = session.read.option('header', 'true') \
    .option('sep', ';') \
    .csv('./data/cars.csv')

In [5]:
# conociendo la data
df_cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



LIMPIEZA DE DATOS

1. eliminamos filas duplicadas

In [6]:
df_cars.count()

794

In [7]:
df_cars = df_cars.dropDuplicates()

In [8]:
# validamos que se eliminaron todos los datos duplicados
df_cars.count()

406

2. Convierte las columnas numéricas (MPG, Cylinders, Displacement, Horsepower, Weight, Acceleration) a tipos numéricos (FloatType o IntegerType según corresponda).

.Validare primero el tipo de dato que tiene cada columna

In [9]:
df_cars.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



In [10]:
df_cars.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Volkswagen Dasher...|43.4|        4|       90.00|     48.00| 2335.|        23.7|   80|Europe|
|       Datsun 280-ZX|32.7|        6|       168.0|     132.0| 2910.|        11.4|   80| Japan|
|  Mercedes Benz 300d|25.4|        5|       183.0|     77.00| 3530.|        20.1|   79|Europe|
|           Mazda GLC|46.6|        4|       86.00|     65.00| 2110.|        17.9|   80| Japan|
|    Honda Civic 1300|35.1|        4|       81.00|     60.00| 1760.|        16.1|   81| Japan|
|           Ford F250|10.0|        8|       360.0|     215.0| 4615.|        14.0|   70|    US|
|     Volvo 145e (sw)|18.0|        4|       121.0|     112.0| 2933.|        14.5|   72|Europe|
|Volkswagen Super ...|26.0|        4|       97.00|

In [11]:
from pyspark.sql.types import FloatType, IntegerType

In [12]:
from pyspark.sql.functions import col, when

In [13]:
numerical_columns = ["MPG", "Cylinders", "Displacement", "Horsepower", "Weight", "Acceleration"]
for column in numerical_columns:
    df_cars = df_cars.withColumn(column, col(column).cast(FloatType()))

In [14]:
df_cars = df_cars.withColumn("Model", col("Model").cast(IntegerType()))

Valido que los cambios se realizaron correctamente

In [15]:
df_cars.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: float (nullable = true)
 |-- Cylinders: float (nullable = true)
 |-- Displacement: float (nullable = true)
 |-- Horsepower: float (nullable = true)
 |-- Weight: float (nullable = true)
 |-- Acceleration: float (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)



3. Reemplaza los valores 0 de la columna MPG con null para indicar datos faltantes


In [16]:
df_cars = df_cars.withColumn("MPG", when(col("MPG") == 0, None).otherwise(col("MPG")))

TRANSFORMACIONES

1. Agrega una columna nueva llamada Power_to_Weight que calcule la relación entre Horsepower y Weight (Horsepower / Weight).

In [17]:
df_cars = df_cars.withColumn("Power_to_Weight", df_cars.Horsepower / df_cars.Weight)

In [18]:
df_cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|     Power_to_Weight|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|Volkswagen Dasher...|43.4|      4.0|        90.0|      48.0|2335.0|        23.7|   80|Europe|0.020556745182012847|
|       Datsun 280-ZX|32.7|      6.0|       168.0|     132.0|2910.0|        11.4|   80| Japan| 0.04536082474226804|
|  Mercedes Benz 300d|25.4|      5.0|       183.0|      77.0|3530.0|        20.1|   79|Europe| 0.02181303116147309|
|           Mazda GLC|46.6|      4.0|        86.0|      65.0|2110.0|        17.9|   80| Japan|0.030805687203791468|
|    Honda Civic 1300|35.1|      4.0|        81.0|      60.0|1760.0|        16.1|   81| Japan| 0.03409090909090909|
+--------------------+----+---------+------------+----------+------+----

2. Agrega una columna llamada Vehicle_Age que calcule cuántos años tiene el vehículo basado en el año actual (Model).

In [19]:
from datetime import datetime

In [20]:
from pyspark.sql.functions import lit

In [21]:
# obtenemos el año actual con la funcion datetime
ano_actual = datetime.now().year

In [22]:
# debemos colocar la fecha del modelo en los años 1900 y no dejarlos en 80,90,70 para hacer el calculo de manera correcta
df_cars = df_cars.withColumn("Model", when(col("Model") < 100, col("Model") + 1900).otherwise(col("Model")))

In [23]:
df_cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|     Power_to_Weight|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+
|Volkswagen Dasher...|43.4|      4.0|        90.0|      48.0|2335.0|        23.7| 1980|Europe|0.020556745182012847|
|       Datsun 280-ZX|32.7|      6.0|       168.0|     132.0|2910.0|        11.4| 1980| Japan| 0.04536082474226804|
|  Mercedes Benz 300d|25.4|      5.0|       183.0|      77.0|3530.0|        20.1| 1979|Europe| 0.02181303116147309|
|           Mazda GLC|46.6|      4.0|        86.0|      65.0|2110.0|        17.9| 1980| Japan|0.030805687203791468|
|    Honda Civic 1300|35.1|      4.0|        81.0|      60.0|1760.0|        16.1| 1981| Japan| 0.03409090909090909|
+--------------------+----+---------+------------+----------+------+----

In [24]:
# uso lit ya que sera un valor constante
df_cars = df_cars.withColumn("Vehicle_Age", lit(ano_actual) - col("Model").cast("int"))

In [25]:
df_cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|     Power_to_Weight|Vehicle_Age|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+
|Volkswagen Dasher...|43.4|      4.0|        90.0|      48.0|2335.0|        23.7| 1980|Europe|0.020556745182012847|         44|
|       Datsun 280-ZX|32.7|      6.0|       168.0|     132.0|2910.0|        11.4| 1980| Japan| 0.04536082474226804|         44|
|  Mercedes Benz 300d|25.4|      5.0|       183.0|      77.0|3530.0|        20.1| 1979|Europe| 0.02181303116147309|         45|
|           Mazda GLC|46.6|      4.0|        86.0|      65.0|2110.0|        17.9| 1980| Japan|0.030805687203791468|         44|
|    Honda Civic 1300|35.1|      4.0|        81.0|      60.0|1760.0|        16.1| 1981| Japan| 0.0340909

3. Clasifica los valores de MPG en una nueva columna llamada Efficiency con las siguientes categorías:

Low: MPG menor a 15.
Medium: MPG entre 15 y 25.
High: MPG mayor a 25.

In [26]:
df_cars = df_cars.withColumn("Efficiency",
                             when(col("MPG") < 15, "Low")
                            .when((col("MPG") > 15) & (col("MPG") <= 25), "Medium")
                            .otherwise("high")
                            )

In [27]:
df_cars.show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+----------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|     Power_to_Weight|Vehicle_Age|Efficiency|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+----------+
|Volkswagen Dasher...|43.4|      4.0|        90.0|      48.0|2335.0|        23.7| 1980|Europe|0.020556745182012847|         44|      high|
|       Datsun 280-ZX|32.7|      6.0|       168.0|     132.0|2910.0|        11.4| 1980| Japan| 0.04536082474226804|         44|      high|
|  Mercedes Benz 300d|25.4|      5.0|       183.0|      77.0|3530.0|        20.1| 1979|Europe| 0.02181303116147309|         45|      high|
|           Mazda GLC|46.6|      4.0|        86.0|      65.0|2110.0|        17.9| 1980| Japan|0.030805687203791468|         44|      high|
|    Honda Civic 1300|35.1|

ANALISIS DE DATOS


1. Obtén el promedio de MPG por región (Origin).

In [28]:
from pyspark.sql.functions import avg

In [29]:
df_avg_mpg = df_cars.groupBy("Origin").agg(avg("MPG").alias("Average_MPG"))

In [30]:
df_avg_mpg.show()

+------+------------------+
|Origin|       Average_MPG|
+------+------------------+
|Europe| 27.89142859322684|
|    US| 20.08353414880224|
| Japan|30.450632843790174|
+------+------------------+



2. Obtén el vehículo con el mejor MPG por cada región.

In [31]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

In [37]:
# Definir una ventana que ordene "MPG"
window_spec = Window.partitionBy("Origin").orderBy(col("MPG").desc())

# Agregar una columna con el numero de fila dentro de cada grupo
df_with_rank = df_cars.withColumn("rank", row_number().over(window_spec))

# Filtrar solo las filas con el mejor MPG (rank == 1)
df_best_car = df_with_rank.filter(col("rank") == 1).drop("rank")


In [38]:
df_best_car.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+----------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|     Power_to_Weight|Vehicle_Age|Efficiency|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+----------+
|Volkswagen Rabbit...|44.3|      4.0|        90.0|      48.0|2085.0|        21.7| 1980|Europe| 0.02302158273381295|         44|      high|
|           Mazda GLC|46.6|      4.0|        86.0|      65.0|2110.0|        17.9| 1980| Japan|0.030805687203791468|         44|      high|
|      Plymouth Champ|39.0|      4.0|        86.0|      64.0|1875.0|        16.4| 1981|    US|0.034133333333333335|         43|      high|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------------+-----------+----------+



3. Agrupa los datos por Cylinders y calcula:

El promedio de Horsepower.

El peso promedio.

In [39]:
df_grouped = df_cars.groupBy("Cylinders") \
  .agg(
    avg("Horsepower").alias("avg_Horsepower"),
    avg("Weight").alias("avg_Weight")
)

In [40]:
df_grouped.show()

+---------+------------------+------------------+
|Cylinders|    avg_Horsepower|        avg_Weight|
+---------+------------------+------------------+
|      5.0| 82.33333333333333|3103.3333333333335|
|      3.0|             99.25|            2398.5|
|      6.0|100.29761904761905|3198.2261904761904|
|      8.0| 158.4537037037037| 4105.194444444444|
|      4.0| 76.57487922705315| 2312.685990338164|
+---------+------------------+------------------+



GUARDAR RESULTADOS

Guarda los resultados de los ejercicios anteriores en un archivo CSV:

Guarda el DataFrame limpio en un archivo llamado cleaned_cars.csv.

Guarda los análisis de MPG por región y los vehículos con mejor MPG en un archivo llamado analysis_results.csv.

In [46]:
df_cars.write \
    .option("header", "true") \
    .csv("./cleaned_cars.csv")

In [47]:
df_avg_mpg.write \
    .option("header", "true") \
    .csv("./analysis_results_mpg_by_region.csv")

df_best_car.write \
    .option("header", "true") \
    .csv("./analysis_results_best_cars.csv")