# ST1803 Usando PySpark

Vamos a configurar el uso de PySpark en el cuaderno (dirigido principalmente para el uso en Google Colab) y haremos algunos ejemplos de calentamiento.

El código de configuración tomado del repositorio https://github.com/groda/big_data/

## Instalar Spark
Usaremos el manejador de paquetes `pipenv` para instalar `pyspark`.

In [1]:
!pipenv install pyspark~=3.5.0

[1;32mInstalling pyspark~=[0m[1;36m3.5[0m[1;32m.[0m[1;36m0[0m[1;33m...[0m
[?25lResolving pyspark~=[1;36m3.5[0m.[1;36m0[0m[33m...[0m
[2K✔ Installation Succeeded
[2K[32m⠋[0m Installing pyspark...
[1A[2K[1mInstalling dependencies from Pipfile.lock [0m[1m([0m[1m04efcd[0m[1m)[0m[1;33m...[0m
To activate this project's virtualenv, run [33mpipenv shell[0m.
Alternatively, run a command inside the virtualenv with [33mpipenv run[0m.


Revisar si se tiene Java 8 o posterior. En Colab tenemos el último Java (11) pero en otros ambientes deberá ser instalado.

In [8]:
import os
import shutil

def is_java_installed():
    java_path = shutil.which("java")
    if java_path:
        os.environ['JAVA_HOME'] = os.path.realpath(java_path).split('/bin')[0]
        return True
    else:
        return False

# Uso de la función
if is_java_installed():
    print("✅ Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
    print("❌ Java is not installed. Please install Java.")


✅ Java is already installed: /usr/lib/jvm/java-11-openjdk-amd64


## Ejemplo1: Hello World
Vamos a empezar con una aplicación que:


*   Comience una sesión de Spark llamada `spark`
*   Imprima "Hello, World!"
*   Cierre la sesión de Spark.

Esta sería una aplicación auto-contenida (ver https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications).



In [10]:
%%writefile HelloWorld.py
"""HelloWorld.py"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Hello World").getOrCreate()

print("Hello, World!")

spark.stop()

Writing HelloWorld.py


Para ejecutar una aplicación en Spark se puede utilizar directamente Python, pero si se quiere utilizar toda la configuración de Spark debe usar el script `spark-submit`.

In [11]:
!spark-submit HelloWorld.py

24/02/20 10:42:12 WARN Utils: Your hostname, Camilo resolves to a loopback address: 127.0.1.1; using 172.29.121.68 instead (on interface eth0)
24/02/20 10:42:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/02/20 10:42:14 INFO SparkContext: Running Spark version 3.5.0
24/02/20 10:42:14 INFO SparkContext: OS info Linux, 5.15.133.1-microsoft-standard-WSL2, amd64
24/02/20 10:42:14 INFO SparkContext: Java version 11.0.21
24/02/20 10:42:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/20 10:42:14 INFO ResourceUtils: No custom resources configured for spark.driver.
24/02/20 10:42:14 INFO SparkContext: Submitted application: Hello World
24/02/20 10:42:14 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, scrip

Todo el texto adicional de "Hello, World!" es debido a mensajes logs de la plataforma. Si se quiere tener estos logs aparte los puedo llegar a un archivo (por defecto van al stream estándar de errores).

In [12]:
!spark-submit HelloWorld.py 2>log.txt

Hello, World!


Ahora tengo los logs separados, que puedo revisar en el archivo `log.txt`

In [13]:
!cat log.txt

24/02/20 10:42:44 WARN Utils: Your hostname, Camilo resolves to a loopback address: 127.0.1.1; using 172.29.121.68 instead (on interface eth0)
24/02/20 10:42:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/02/20 10:42:48 INFO SparkContext: Running Spark version 3.5.0
24/02/20 10:42:48 INFO SparkContext: OS info Linux, 5.15.133.1-microsoft-standard-WSL2, amd64
24/02/20 10:42:48 INFO SparkContext: Java version 11.0.21
24/02/20 10:42:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/20 10:42:48 INFO ResourceUtils: No custom resources configured for spark.driver.
24/02/20 10:42:48 INFO SparkContext: Submitted application: Hello World
24/02/20 10:42:48 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, scrip

Pero ejecutar la aplicación se sintió muy lenta, la razón de la mayoría de la lentitud tiene que ver con la máquina virtual de Java (JVM en inglés), la cual debe ejecutarse y luego el motor de Spark se ejecuta sobre esta. Veamos cuánto toma sólo la aplicación en ejecutarse.

In [14]:
%time !spark-submit HelloWorld.py 2>log.txt

Hello, World!
CPU times: user 83.6 ms, sys: 9.69 ms, total: 93.3 ms
Wall time: 4.71 s


## Ejemplos en PySpark
PySpark viene con muchos ejemplos en su instalación, para encontrarlos hay que saber dónde quedó instalado PySpark.

In [16]:
!pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /home/camilo/.local/share/virtualenvs/MMDS-OQFbRnK5/lib/python3.11/site-packages
Requires: py4j
Required-by: 


/usr/local/lib/python3.10/dist-packages es donde quedó instalado, debemos buscar la carpeta `examples`. Otra forma de saber donde está instalado es usar el archivo `find_spark_home.py` y usarlo para crear una variable de ambiente.

In [17]:
!find_spark_home.py

/home/camilo/.local/share/virtualenvs/MMDS-OQFbRnK5/lib/python3.11/site-packages/pyspark


In [19]:
import subprocess

In [20]:
pyspark_folder = subprocess.run(["find_spark_home.py"], capture_output=True, text=True)
print("Carpeta de PySpark en:", pyspark_folder.stdout)
# Resultado en una variable de entorno
os.environ['SPARK_HOME'] = pyspark_folder.stdout.strip()

Carpeta de PySpark en: /home/camilo/.local/share/virtualenvs/MMDS-OQFbRnK5/lib/python3.11/site-packages/pyspark



In [21]:
!ls -p $SPARK_HOME

__init__.py	    install.py		      sbin/
__pycache__/	    instrumentation_utils.py  serializers.py
_globals.py	    jars/		      shell.py
_typing.pyi	    java_gateway.py	      shuffle.py
accumulators.py     join.py		      sql/
bin/		    licenses/		      statcounter.py
broadcast.py	    ml/			      status.py
cloudpickle/	    mllib/		      storagelevel.py
conf.py		    pandas/		      streaming/
context.py	    profiler.py		      taskcontext.py
daemon.py	    py.typed		      testing/
data/		    python/		      traceback_utils.py
errors/		    rdd.py		      util.py
examples/	    rddsampler.py	      version.py
files.py	    resource/		      worker.py
find_spark_home.py  resultiterable.py	      worker_util.py


Si estamos en Ubuntu podemos instalar una herramienta para ver mejor las carpetas.

In [27]:
#!apt install tree
#!sudo apt install tree

[sudo] password for camilo: 


In [26]:
# All examples
!tree -I "__pycache__" $SPARK_HOME/examples

[01;34m/home/camilo/.local/share/virtualenvs/MMDS-OQFbRnK5/lib/python3.11/site-packages/pyspark/examples[0m
└── [01;34msrc[0m
    └── [01;34mmain[0m
        └── [01;34mpython[0m
            ├── __init__.py
            ├── als.py
            ├── avro_inputformat.py
            ├── kmeans.py
            ├── logistic_regression.py
            ├── [01;34mml[0m
            │   ├── aft_survival_regression.py
            │   ├── als_example.py
            │   ├── binarizer_example.py
            │   ├── bisecting_k_means_example.py
            │   ├── bucketed_random_projection_lsh_example.py
            │   ├── bucketizer_example.py
            │   ├── chi_square_test_example.py
            │   ├── chisq_selector_example.py
            │   ├── correlation_example.py
            │   ├── count_vectorizer_example.py
            │   ├── cross_validator.py
            │   ├── dataframe_example.py
            │   ├── dct_example.py
            │   ├── decision_tree_classification_example

In [28]:
# All example datasets
!tree $SPARK_HOME/data

[01;34m/home/camilo/.local/share/virtualenvs/MMDS-OQFbRnK5/lib/python3.11/site-packages/pyspark/data[0m
├── [01;34martifact-tests[0m
│   └── [01;34mcrc[0m
│       ├── junitLargeJar.txt
│       └── smallJar.txt
├── [01;34mgraphx[0m
│   ├── followers.txt
│   └── users.txt
├── [01;34mmllib[0m
│   ├── [01;34mals[0m
│   │   ├── sample_movielens_ratings.txt
│   │   └── test.data
│   ├── gmm_data.txt
│   ├── [01;34mimages[0m
│   │   ├── license.txt
│   │   └── [01;34morigin[0m
│   │       ├── [01;34mkittens[0m
│   │       │   └── not-image.txt
│   │       └── license.txt
│   ├── kmeans_data.txt
│   ├── pagerank_data.txt
│   ├── pic_data.txt
│   ├── [01;34mridge-data[0m
│   │   └── lpsa.data
│   ├── sample_binary_classification_data.txt
│   ├── sample_fpgrowth.txt
│   ├── sample_isotonic_regression_libsvm_data.txt
│   ├── sample_kmeans_data.txt
│   ├── sample_lda_data.txt
│   ├── sample_lda_libsvm_data.txt
│   ├── sample_libsvm_data.txt
│   ├── [01;32msample_linear_regress

## Ejemplo2: Contar palabras
Ya vimos que PySpark trae ejemplos incluyendo wordcount.py, pero no tenemos un dataset decente para texto. Descarguemos Don Quijote para analizarlo y hagamos nuestro propio contador de palabras.

In [29]:
!wget https://www.gutenberg.org/cache/epub/996/pg996.txt -O don_quixote.txt

--2024-02-20 10:47:43--  https://www.gutenberg.org/cache/epub/996/pg996.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2391728 (2.3M) [text/plain]
Saving to: ‘don_quixote.txt’


2024-02-20 10:47:44 (3.84 MB/s) - ‘don_quixote.txt’ saved [2391728/2391728]



In [30]:
!head -10 don_quixote.txt

The Project Gutenberg eBook of Don Quixote
    
This ebook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this ebook or online
at www.gutenberg.org. If you are not located in the United States,
you will have to check the laws of the country where you are located
before using this eBook.



In [31]:
# Copy into current folder
!cp $SPARK_HOME/examples/src/main/python/wordcount.py ./

In [32]:
# wordcount.py but without comments
!sed -n 18,42p wordcount.py

import sys
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        sys.exit(-1)

    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()


In [35]:
# Run wordcount.py with output (1: standard output) and error (2: error output) files
%time !spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt

CPU times: user 159 ms, sys: 40.3 ms, total: 200 ms
Wall time: 10.4 s


In [34]:
!head out.txt

The: 846
Project: 80
Gutenberg: 23
eBook: 4
of: 12866
Don: 2541
Quixote: 1012
: 8413
This: 97
ebook: 2


Para trabajar de manera interactiva puedes usar Python directamente en el cuaderno o ejecutar los scripts con el comando `python`, pero toda la configuración de logs y demás variables de ambiente en Spark serán ignoradas (`spark-submit` se encarga de configurar las variables de ambiente de Spark).

De todas maneras hagamos una prueba:

In [36]:
from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName('PythonWordCount').getOrCreate()
lines = spark.read.text('don_quixote.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
output = counts.collect()
spark.stop()

your 131072x1 screen size is bogus. expect trouble
24/02/20 10:58:50 WARN Utils: Your hostname, Camilo resolves to a loopback address: 127.0.1.1; using 172.29.121.68 instead (on interface eth0)
24/02/20 10:58:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/20 10:58:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [37]:
output[:20]

[('The', 846),
 ('Project', 80),
 ('Gutenberg', 23),
 ('eBook', 4),
 ('of', 12866),
 ('Don', 2541),
 ('Quixote', 1012),
 ('', 8413),
 ('This', 97),
 ('ebook', 2),
 ('is', 3504),
 ('for', 4535),
 ('the', 20933),
 ('use', 64),
 ('anyone', 82),
 ('anywhere', 10),
 ('in', 6864),
 ('United', 15),
 ('States', 8),
 ('and', 16604)]

## MISIÓN: Contar palabras

Tu misión si decides aceptarla es cambiar el contador de palabras para que te muestre cuántas palabras comienzan por cada letra, ignorando mayúsculas y minúsculas.

In [86]:
# YOUR CODE HERE
from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName('PythonWordCount').getOrCreate()
lines = spark.read.text('don_quixote.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .filter(lambda x: x and x[0]) \
                  .map(lambda x: (x[0].lower(), 1)) \
                  .reduceByKey(add)
output = sorted(counts.collect())
spark.stop()

                                                                                

In [87]:
output[:40]

[('#', 1),
 ('$', 1),
 ('&', 1),
 ('(', 570),
 ('*', 4),
 ('-', 3),
 ('.', 1),
 ('1', 90),
 ('2', 10),
 ('3', 5),
 ('4', 3),
 ('5', 5),
 ('6', 3),
 ('7', 1),
 ('8', 2),
 ('9', 3),
 ('[', 1),
 ('_', 84),
 ('a', 49404),
 ('b', 18470),
 ('c', 14799),
 ('d', 14157),
 ('e', 6803),
 ('f', 16010),
 ('g', 8638),
 ('h', 32160),
 ('i', 28519),
 ('j', 960),
 ('k', 3256),
 ('l', 10090),
 ('m', 18921),
 ('n', 9470),
 ('o', 27047),
 ('p', 10890),
 ('q', 3104),
 ('r', 8265),
 ('s', 30620),
 ('t', 70652),
 ('u', 4277),
 ('v', 2595)]

In [84]:
# YOUR CODE HERE
from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName('PythonWordCount').getOrCreate()
lines = spark.read.text('don_quixote.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: x.replace(',', '')
                       .replace('#', '')
                       .replace('$', '')
                       .replace('&', '')
                       .replace('(', '')
                       .replace('*', '')
                       .replace('-', '')
                       .replace('.', '')
                       .replace('1', '')
                       .replace('2', '')
                       .replace('3', '')
                       .replace('4', '')
                       .replace('5', '')
                       .replace('6', '')
                       .replace('7', '')
                       .replace('[', '')
                       .replace('_', '')
                       .replace('"', '')
                       .replace('[', '')                    
                       .replace('@', '')) \
                  .filter(lambda x: x and x[0].isalpha() and x[0].isascii()) \
                  .map(lambda x: (x[0].lower(), 1)) \
                  .map(lambda x: (x)) \
                  .reduceByKey(add)
output = sorted(counts.collect())
spark.stop()

                                                                                

In [85]:
output

[('a', 49458),
 ('b', 18482),
 ('c', 14805),
 ('d', 14167),
 ('e', 6829),
 ('f', 16087),
 ('g', 8641),
 ('h', 32163),
 ('i', 28536),
 ('j', 962),
 ('k', 3468),
 ('l', 10096),
 ('m', 18929),
 ('n', 9476),
 ('o', 27060),
 ('p', 10903),
 ('q', 3106),
 ('r', 8270),
 ('s', 30631),
 ('t', 70669),
 ('u', 4278),
 ('v', 2596),
 ('w', 29797),
 ('x', 166),
 ('y', 4531),
 ('z', 111),
 ('à', 1),
 ('á', 21),
 ('æ', 11),
 ('é', 1),
 ('í', 1),
 ('ú', 6)]

In [88]:
# YOUR CODE HERE
from pyspark.sql import SparkSession
from operator import add

spark = SparkSession.builder.appName('PythonWordCount').getOrCreate()
lines = spark.read.text('don_quixote.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .filter(lambda x: x and x[0].isalpha() and x[0].isascii()) \
                  .map(lambda x: (x[0].lower(), 1)) \
                  .map(lambda x: (x)) \
                  .reduceByKey(add)
output = sorted(counts.collect())
spark.stop()

                                                                                

In [89]:
output

[('a', 49404),
 ('b', 18470),
 ('c', 14799),
 ('d', 14157),
 ('e', 6803),
 ('f', 16010),
 ('g', 8638),
 ('h', 32160),
 ('i', 28519),
 ('j', 960),
 ('k', 3256),
 ('l', 10090),
 ('m', 18921),
 ('n', 9470),
 ('o', 27047),
 ('p', 10890),
 ('q', 3104),
 ('r', 8265),
 ('s', 30620),
 ('t', 70652),
 ('u', 4277),
 ('v', 2595),
 ('w', 29767),
 ('x', 166),
 ('y', 4531),
 ('z', 111)]