**Install Dependencies:**


Java 8 Apache Spark with hadoop and Findspark (used to locate the spark in the system)

In [4]:
!pip install -q pyspark

In [6]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q --show-progress http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark



Loading Dataset

In [7]:
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

--2025-02-10 18:31:02--  https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobceles.github.io (jacobceles.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobceles.github.io (jacobceles.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv [following]
--2025-02-10 18:31:02--  https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/cars.csv
Resolving jacobcelestine.com (jacobcelestine.com)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jacobcelestine.com (jacobcelestine.com)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 22608 (22K) [text/csv]
Saving to: ‘cars.csv’


2025-02-10 18:31:02 (22.7 MB/s) - ‘cars.csv’ saved [22608/22608]



Load data from csv to a dataframe. Columns separated by ;

In [57]:
# Cargar el archivo CSV en un DataFrame de Spark
df = spark.read.csv("cars.csv", sep=";", header=True, inferSchema=True)

Select columns

*   List columns
*   Select by column name



In [58]:
# Listar todas las columnas del DataFrame
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [59]:
# Seleccionar por nombre de columna
columns_to_select = [df.columns]
df_selected = df.select(*columns_to_select)
df_selected.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|  4341|        10.0|   70|    US|
|    Chevrolet Impala|14.0|        8|       454.0|     220.0|  4354|         9.0|   70|    US|
|   Plymouth Fury iii|14.0|        8|       440.0|

Add a new column and populate it with any literal value.

In [61]:
from pyspark.sql.functions import lit
df = df.withColumn("GladisTesting", lit(1))
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+-------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|GladisTesting|
+--------------------+----+---------+------------+----------+------+------------+-----+------+-------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|            1|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|            1|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|            1|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|            1|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|            1|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|  4341|        10.0|   70|    US|            1|
|    Chevrolet Impa

Rename the column named 'first_column' to 'new_column_one'

In [62]:
# I did not find "first_column" so, I am renaming my new column to "new_column"
df = df.withColumnRenamed("GladisTesting", "new_column_one")
df.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_one|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|             1|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|             1|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|             1|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|             1|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|             1|
|    Ford Galaxie 500|15.0|        8|       429.0|     198.0|  4341|        10.0|   70|    US|             1|
|    Chevr

Group by columns Origin and Model

In [63]:
df_grouped = df.groupBy("Origin", "Model").count()
# Mostrar el resultado
df_grouped.show()

+------+-----+-----+
|Origin|Model|count|
+------+-----+-----+
|Europe|   71|    5|
|Europe|   80|    9|
|Europe|   79|    4|
| Japan|   75|    4|
|    US|   72|   18|
|    US|   80|    7|
|Europe|   74|    6|
| Japan|   79|    2|
|Europe|   76|    8|
|    US|   75|   20|
| Japan|   77|    6|
|    US|   82|   20|
| Japan|   80|   13|
| Japan|   78|    8|
|    US|   78|   22|
|Europe|   75|    6|
|    US|   71|   20|
|    US|   77|   18|
| Japan|   70|    2|
| Japan|   71|    4|
+------+-----+-----+
only showing top 20 rows



Filter rows where Origin is Europe

In [64]:
df_filtered  = df.filter(df.Origin == "Europe")
# Mostrar el resultado
df_filtered.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_one|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|Citroen DS-21 Pallas| 0.0|        4|       133.0|     115.0|  3090|        17.5|   70|Europe|             1|
|Volkswagen 1131 D...|26.0|        4|        97.0|      46.0|  1835|        20.5|   70|Europe|             1|
|         Peugeot 504|25.0|        4|       110.0|      87.0|  2672|        17.5|   70|Europe|             1|
|         Audi 100 LS|24.0|        4|       107.0|      90.0|  2430|        14.5|   70|Europe|             1|
|            Saab 99e|25.0|        4|       104.0|      95.0|  2375|        17.5|   70|Europe|             1|
|            BMW 2002|26.0|        4|       121.0|     113.0|  2234|        12.5|   70|Europe|             1|
|Volkswage

Show min and max Weight

In [65]:
from pyspark.sql.functions import min, max
# Calcular el peso mínimo y máximo
df_weight_stats = df.select(min("Weight").alias("Min_Weight"), max("Weight").alias("Max_Weight"))
# Mostrar los resultados
df_weight_stats.show()

+----------+----------+
|Min_Weight|Max_Weight|
+----------+----------+
|      1613|      5140|
+----------+----------+



# Joins

In [66]:
# Create two dataframes
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

+---+--------+
| id|car_name|
+---+--------+
|  1|   Car A|
|  2|   Car B|
|  3|   Car C|
+---+--------+

+---+---------+
| id|car_price|
+---+---------+
|  1|     1000|
|  2|     2000|
|  3|     3000|
+---+---------+



Execute an inner join so we can see the id, name and price of each car in one row

In [67]:
# Ejecutar INNER JOIN en la columna "id"
result_df = cars_df.join(car_price_df, on="id", how="inner")

# Mostrar el resultado
result_df.show()

+---+--------+---------+
| id|car_name|car_price|
+---+--------+---------+
|  1|   Car A|     1000|
|  2|   Car B|     2000|
|  3|   Car C|     3000|
+---+--------+---------+



# Spark SQL


*   Load Cars.csv
*   Register a temporary table
*   Select all data from temp table with a limit



In [69]:
# Carga de Cars.csv
# Registrar la tabla temporal
df.createOrReplaceTempView("cars")

In [70]:
# ver todas las columnas con limit 5
spark.sql("SELECT * FROM cars LIMIT 5").show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|new_column_one|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+
|Chevrolet Chevell...|18.0|        8|       307.0|     130.0|  3504|        12.0|   70|    US|             1|
|   Buick Skylark 320|15.0|        8|       350.0|     165.0|  3693|        11.5|   70|    US|             1|
|  Plymouth Satellite|18.0|        8|       318.0|     150.0|  3436|        11.0|   70|    US|             1|
|       AMC Rebel SST|16.0|        8|       304.0|     150.0|  3433|        12.0|   70|    US|             1|
|         Ford Torino|17.0|        8|       302.0|     140.0|  3449|        10.5|   70|    US|             1|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------------+

