# ST1803 Usando PySpark

Vamos a configurar el uso de PySpark en el cuaderno (dirigido principalmente para el uso en Google Colab) y haremos algunos ejemplos de calentamiento.

El código de configuración tomado del repositorio https://github.com/groda/big_data/


Revisar si se tiene Java 8 o posterior. En Colab tenemos el último Java (11) pero en otros ambientes deberá ser instalado.


## Ejemplo1: Hello World

Vamos a empezar con una aplicación que:

- Comience una sesión de Spark llamada `spark`
- Imprima "Hello, World!"
- Cierre la sesión de Spark.

Esta sería una aplicación auto-contenida (ver https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications).


In [1]:
%%writefile HelloWorld.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Hello World").getOrCreate()

print("Hello, World!")

spark.stop()

Overwriting HelloWorld.py


Para ejecutar una aplicación en Spark se puede utilizar directamente Python, pero si se quiere utilizar toda la configuración de Spark debe usar el script `spark-submit`.


In [2]:
!spark-submit HelloWorld.py

24/02/21 18:48:44 WARN Utils: Your hostname, Davids-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.161.51.44 instead (on interface en0)
24/02/21 18:48:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/02/21 18:48:44 INFO SparkContext: Running Spark version 3.5.0
24/02/21 18:48:44 INFO SparkContext: OS info Mac OS X, 14.3.1, aarch64
24/02/21 18:48:44 INFO SparkContext: Java version 20.0.2
24/02/21 18:48:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/21 18:48:45 INFO ResourceUtils: No custom resources configured for spark.driver.
24/02/21 18:48:45 INFO SparkContext: Submitted application: Hello World
24/02/21 18:48:45 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , ven

Todo el texto adicional de "Hello, World!" es debido a mensajes logs de la plataforma. Si se quiere tener estos logs aparte los puedo llegar a un archivo (por defecto van al stream estándar de errores).


In [3]:
!spark-submit HelloWorld.py 2>log.txt

Hello, World!


Ahora tengo los logs separados, que puedo revisar en el archivo `log.txt`


In [4]:
!cat log.txt

24/02/21 18:48:46 WARN Utils: Your hostname, Davids-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.161.51.44 instead (on interface en0)
24/02/21 18:48:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/02/21 18:48:47 INFO SparkContext: Running Spark version 3.5.0
24/02/21 18:48:47 INFO SparkContext: OS info Mac OS X, 14.3.1, aarch64
24/02/21 18:48:47 INFO SparkContext: Java version 20.0.2
24/02/21 18:48:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/21 18:48:47 INFO ResourceUtils: No custom resources configured for spark.driver.
24/02/21 18:48:47 INFO SparkContext: Submitted application: Hello World
24/02/21 18:48:47 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , ven

Pero ejecutar la aplicación se sintió muy lenta, la razón de la mayoría de la lentitud tiene que ver con la máquina virtual de Java (JVM en inglés), la cual debe ejecutarse y luego el motor de Spark se ejecuta sobre esta. Veamos cuánto toma sólo la aplicación en ejecutarse.


In [5]:
%time !spark-submit HelloWorld.py 2>log.txt

Hello, World!
CPU times: user 18.6 ms, sys: 8.65 ms, total: 27.2 ms
Wall time: 2.38 s


## Ejemplos en PySpark

PySpark viene con muchos ejemplos en su instalación, para encontrarlos hay que saber dónde quedó instalado PySpark.


In [6]:
!pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /Users/david/.local/share/virtualenvs/spark-grandes-volumenes-sdSE_HtW/lib/python3.11/site-packages
Requires: py4j
Required-by: 


/usr/local/lib/python3.10/dist-packages es donde quedó instalado, debemos buscar la carpeta `examples`. Otra forma de saber donde está instalado es usar el archivo `find_spark_home.py` y usarlo para crear una variable de ambiente.


In [7]:
!find_spark_home.py

/Users/david/.local/share/virtualenvs/spark-grandes-volumenes-sdSE_HtW/lib/python3.11/site-packages/pyspark


In [8]:
import os
import subprocess

pyspark_folder = subprocess.run(["find_spark_home.py"], capture_output=True, text=True)
print("Carpeta de PySpark en:", pyspark_folder.stdout)
# Resultado en una variable de entorno
os.environ["SPARK_HOME"] = pyspark_folder.stdout.strip()

Carpeta de PySpark en: /Users/david/.local/share/virtualenvs/spark-grandes-volumenes-sdSE_HtW/lib/python3.11/site-packages/pyspark



In [9]:
!ls -p $SPARK_HOME

__init__.py               install.py                [1m[36msbin[m[m/
[1m[36m__pycache__[m[m/              instrumentation_utils.py  serializers.py
_globals.py               [1m[36mjars[m[m/                     shell.py
_typing.pyi               java_gateway.py           shuffle.py
accumulators.py           join.py                   [1m[36msql[m[m/
[1m[36mbin[m[m/                      [1m[36mlicenses[m[m/                 statcounter.py
broadcast.py              [1m[36mml[m[m/                       status.py
[1m[36mcloudpickle[m[m/              [1m[36mmllib[m[m/                    storagelevel.py
conf.py                   [1m[36mpandas[m[m/                   [1m[36mstreaming[m[m/
context.py                profiler.py               taskcontext.py
daemon.py                 py.typed                  [1m[36mtesting[m[m/
[1m[36mdata[m[m/                     [1m[36mpython[m[m/                   traceback_utils.py
[1m[36merrors[m[m/   

In [10]:
# All examples
!tree -I "__pycache__" $SPARK_HOME/examples

[01;34m/Users/david/.local/share/virtualenvs/spark-grandes-volumenes-sdSE_HtW/lib/python3.11/site-packages/pyspark/examples[0m
└── [01;34msrc[0m
    └── [01;34mmain[0m
        └── [01;34mpython[0m
            ├── [00m__init__.py[0m
            ├── [00mals.py[0m
            ├── [00mavro_inputformat.py[0m
            ├── [00mkmeans.py[0m
            ├── [00mlogistic_regression.py[0m
            ├── [01;34mml[0m
            │   ├── [00maft_survival_regression.py[0m
            │   ├── [00mals_example.py[0m
            │   ├── [00mbinarizer_example.py[0m
            │   ├── [00mbisecting_k_means_example.py[0m
            │   ├── [00mbucketed_random_projection_lsh_example.py[0m
            │   ├── [00mbucketizer_example.py[0m
            │   ├── [00mchi_square_test_example.py[0m
            │   ├── [00mchisq_selector_example.py[0m
            │   ├── [00mcorrelation_example.py[0m
            │   ├── [00mcount_vectorizer_example.py[0m
            │   ├

In [11]:
# All example datasets
!tree $SPARK_HOME/data

[01;34m/Users/david/.local/share/virtualenvs/spark-grandes-volumenes-sdSE_HtW/lib/python3.11/site-packages/pyspark/data[0m
├── [01;34martifact-tests[0m
│   └── [01;34mcrc[0m
│       ├── [00mjunitLargeJar.txt[0m
│       └── [00msmallJar.txt[0m
├── [01;34mgraphx[0m
│   ├── [00mfollowers.txt[0m
│   └── [00musers.txt[0m
├── [01;34mmllib[0m
│   ├── [01;34mals[0m
│   │   ├── [00msample_movielens_ratings.txt[0m
│   │   └── [00mtest.data[0m
│   ├── [00mgmm_data.txt[0m
│   ├── [01;34mimages[0m
│   │   ├── [00mlicense.txt[0m
│   │   └── [01;34morigin[0m
│   │       ├── [01;34mkittens[0m
│   │       │   └── [00mnot-image.txt[0m
│   │       └── [00mlicense.txt[0m
│   ├── [00mkmeans_data.txt[0m
│   ├── [00mpagerank_data.txt[0m
│   ├── [00mpic_data.txt[0m
│   ├── [01;34mridge-data[0m
│   │   └── [00mlpsa.data[0m
│   ├── [00msample_binary_classification_data.txt[0m
│   ├── [00msample_fpgrowth.txt[0m
│   ├── [00msample_isotonic_regression_libsvm_dat

## Ejemplo2: Contar palabras

Ya vimos que PySpark trae ejemplos incluyendo wordcount.py, pero no tenemos un dataset decente para texto. Descarguemos Don Quijote para analizarlo y hagamos nuestro propio contador de palabras.


In [12]:
!wget https://www.gutenberg.org/cache/epub/996/pg996.txt -O don_quixote.txt

--2024-02-21 18:48:52--  https://www.gutenberg.org/cache/epub/996/pg996.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2391728 (2.3M) [text/plain]
Saving to: ‘don_quixote.txt’


2024-02-21 18:48:53 (2.43 MB/s) - ‘don_quixote.txt’ saved [2391728/2391728]



In [13]:
!head -10 don_quixote.txt

The Project Gutenberg eBook of Don Quixote
    
This ebook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this ebook or online
at www.gutenberg.org. If you are not located in the United States,
you will have to check the laws of the country where you are located
before using this eBook.



In [14]:
# Copy into current folder
!cp $SPARK_HOME/examples/src/main/python/wordcount.py ./

In [15]:
# wordcount.py but without comments
!sed -n 18,42p wordcount.py

import sys
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()


In [16]:
# Run wordcount.py with output (1: standard output) and error (2: error output) files
!spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt

In [17]:
!head out.txt

The: 846
Project: 80
Gutenberg: 23
eBook: 4
of: 12866
Don: 2541
Quixote: 1012
: 8413
This: 97
ebook: 2


Para trabajar de manera interactiva puedes usar Python directamente en el cuaderno o ejecutar los scripts con el comando `python`, pero toda la configuración de logs y demás variables de ambiente en Spark serán ignoradas (`spark-submit` se encarga de configurar las variables de ambiente de Spark).

De todas maneras hagamos una prueba:


In [18]:
from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
lines = spark.read.text("don_quixote.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(add)
output = counts.collect()
spark.stop()

24/02/21 18:49:00 WARN Utils: Your hostname, Davids-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.161.51.44 instead (on interface en0)
24/02/21 18:49:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/21 18:49:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [19]:
output[:20]

[('The', 846),
 ('Project', 80),
 ('Gutenberg', 23),
 ('eBook', 4),
 ('of', 12866),
 ('Don', 2541),
 ('Quixote', 1012),
 ('', 8413),
 ('This', 97),
 ('ebook', 2),
 ('is', 3504),
 ('for', 4535),
 ('the', 20933),
 ('use', 64),
 ('anyone', 82),
 ('anywhere', 10),
 ('in', 6864),
 ('United', 15),
 ('States', 8),
 ('and', 16604)]

## MISIÓN: Contar palabras


Tu misión si decides aceptarla es cambiar el contador de palabras para que te muestre cuántas palabras comienzan por cada letra, ignorando mayúsculas y minúsculas.


In [20]:
# YOUR CODE HERE