In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m476.3 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=d85d1b429214b98cf63ed2993fb4d107a693e37eb2cb8f60bad810f03834ee8a
  Stored in directory: /home/carlos/.cache/pip/wheels/b1/91/5f/283b53010a8016a4ff1c4a1edd99bbe73afacb099645b5471b
Successfully built pyspark
Installing collected packages: py4j, 

In [4]:
# SparkSession es el punto de entrada para interactuar
# con Spark y realizar operaciones de procesamiento
# de datos. Reemplaza a SparkContext y SQLContext en
# versiones anteriores de Spark.


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EjemploPySpark") \
    .getOrCreate()

24/06/23 12:08:29 WARN Utils: Your hostname, carlos-HP-EliteBook-850-G3 resolves to a loopback address: 127.0.1.1; using 192.168.1.102 instead (on interface wlp2s0)
24/06/23 12:08:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/23 12:08:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/06/23 12:08:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
# DataFrame es una colección distribuida de datos
# organizados en columnas, similar a una
# tabla en una base de datos relacional o
# a un dataframe en pandas. Los DataFrames son inmutables.

data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Nombre", "Edad"]

df = spark.createDataFrame(data, columns)
df.show()

                                                                                

+------+----+
|Nombre|Edad|
+------+----+
| Alice|  34|
|   Bob|  45|
| Cathy|  29|
+------+----+



In [12]:
# RDD es la estructura de datos fundamental de Spark que
# representa una colección distribuida de elementos.
# Es inmutable y puede ser creada a partir de archivos 
# en el sistema de archivos Hadoop (HDFS) o desde colecciones en el programa principal de Python.


rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())

[1, 2, 3, 4, 5]


In [14]:
# Transformaciones: Operaciones que crean un nuevo RDD
# a partir de uno existente, son perezosas 
# (lazy) y no ejecutan el código inmediatamente.

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.map(lambda x: x * 2)

# Acciones: Operaciones que devuelven un valor al programa principal o exportan datos a
# un sistema de almacenamiento. Ejecutan las transformaciones pendientes en el RDD.

result = rdd2.collect()
print(result)

[2, 4, 6, 8, 10]


In [17]:
# Select: Seleccionar columnas específicas.
print(df.select("Nombre").show())

+------+
|Nombre|
+------+
| Alice|
|   Bob|
| Cathy|
+------+

None


In [18]:
# Filter: Filtrar filas basadas en una condición.
df.filter(df["Edad"] > 30).show()

+------+----+
|Nombre|Edad|
+------+----+
| Alice|  34|
|   Bob|  45|
+------+----+



In [21]:
# GroupBy: Agrupar filas por una columna y realizar agregaciones.
df.groupBy("Nombre").max().show()

+------+---------+
|Nombre|max(Edad)|
+------+---------+
| Alice|       34|
|   Bob|       45|
| Cathy|       29|
+------+---------+



In [22]:
# Join: Combinar DataFrames basados en una clave común.
data2 = [("Alice", "F"), ("Bob", "M"), ("Cathy", "F")]
columns2 = ["Nombre", "Género"]
df2 = spark.createDataFrame(data2, columns2)
df.join(df2, "Nombre").show()



+------+----+------+
|Nombre|Edad|Género|
+------+----+------+
| Alice|  34|     F|
|   Bob|  45|     M|
| Cathy|  29|     F|
+------+----+------+



                                                                                

In [24]:
# Funciones UDF (User Defined Functions)
from pyspark.sql.functions import udf 
from pyspark.sql.types import IntegerType

def suma(x):
    return x + 3

suma_udf = udf(suma, IntegerType())

df.withColumn("Edad + 3", suma(df["Edad"])).show()

+------+----+--------+
|Nombre|Edad|Edad + 3|
+------+----+--------+
| Alice|  34|      37|
|   Bob|  45|      48|
| Cathy|  29|      32|
+------+----+--------+



In [25]:
# Controlar el número de particiones para mejorar el rendimiento.
rdd = rdd.repartition(10)

In [26]:
# Cache/Persist: Almacenar datos en memoria para mejorar el
# rendimiento en operaciones repetidas.
print(f"Cache \n{df.cache()}")

# EXPLAIN
# Muestra el plan de ejecución
print(f"Explain \n{df.explain()}")

spark.stop()

DataFrame[Nombre: string, Edad: bigint]