# Introducción a PySpark

### Cargando el entorno de PySpark en Google Colab

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
!tar xf spark-3.5.6-bin-hadoop3.tgz
!pip install -q findspark

tar: spark-3.5.6-bin-hadoop3.tgz: Cannot open: No such file or directory
tar: Error is not recoverable: exiting now


In [2]:
import os
os.enciron["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.6-bin-hadoop3"

AttributeError: module 'os' has no attribute 'enciron'

In [1]:
!ls

"ls" no se reconoce como un comando interno o externo,
programa o archivo por lotes ejecutable.


In [None]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master("local[*]") \
                    .appName("Ejemplo") \
                    .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

### Importando una base de datos externa

In [None]:
!wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

### Creando un Dataframe a partir de un archivo "csv" de entrada

In [None]:
df = spark.read.csv("cars.csv", header=True, sep=";", inferSchema=True)
df.show(10)

In [None]:
# Imprimiendo el número de registros en el Dataframe
df.count()

In [None]:
# Imprimiendo el número de columnas en el Dataframe
print(len(df.columns))

### Imprimiendo información sobre el tipo de datos y esquema del Dataframe

In [None]:
# Imprimiendo los tipos de datos del Dataframe
df.dtypes

In [None]:
# Imprimiendo el esquema del Dataframe
df.printSchema()

### Cambiando el esquema del Dataframe

In [None]:
# Si se requiere cambiar el tipo de alguna columna, se puede usar withColumn
from pyspark.sql.types import IntegerType, FloatType

# Dos formas en las cuales se puede cambiar el formato de la columna MPG
df = df.withColumn('MPG', df['MPG'].cast(FloatType()))

df.printSchema()

In [None]:
# Si fuera necesario, se puede cambiar el tipo a todas las columnas del Dataframe usando selectExpr
df2 = df.selectExpr(
    'cast(Car as string) Car',
    'cast(MPG as float) MPG',
    'cast(Cylinders as int) Cylinders',
    'cast(Displacement as int) Displacement',
    'cast(Horsepower as int) Horsepower',
    'cast(Weight as int) Weight',
    'cast(Acceleration as float) Acceleration',
    'cast(Model as int) Model',
    'cast(Origin as string) Origin'
)

df2.printSchema()

In [None]:
df2.show(10, truncate=False)

### Definiendo un esquema para un Dataframe

In [None]:
# Se especifica un DDL
ddl_schema = "Car STRING, MPG FLOAT, Cylinders INT, Displacement FLOAT, Horsepower FLOAT, Weight FLOAT, Acceleration FLOAT, Model INT, Origin STRING"

df3 = spark.read.csv('cars.csv', header=True, sep=';', schema=ddl_schema)
df3.printSchema()

In [None]:
df3.show(10, truncate=False)

In [None]:
# Si se especificaun DDL que no coincide con el dataset, se pueden obtener lecturas nulas
ddl_schema = "Car STRING, MPG INT, Cylinders INT, Displacement FLOAT, Horsepower FLOAT, Weight FLOAT, Acceleration FLOAT, Model INT, Origin STRING"

df4 = spark.read.csv('cars.csv', header=True, sep=';', schema=ddl_schema)
df4.show(10, truncate=False)

## Transformaciones aplicables a columnas

### 1. Selección de columnas: select

In [None]:
# Para la selección de columnas de un dataframe, se pueden usar variantes en la sintaxis
print("Método 1")
df_car = df.select(df.Car)
df_car.show(10, truncate=False)

print("Método 2")
df_car = df.select(df['Car'])
df_car.show(10, truncate=False)

print("Método 3")
df_car = df.select('Car')
df_car.show(10, truncate=False)

In [None]:
# Existe otra forma de acceder a una columna, usando el módulo "col" de SQL
from pyspark.sql.functions import col

print("Método 4")
df.select(col('car')).show(10, truncate=False)

In [None]:
# Es posible seleccionar varias columnas en una sola transformación
# Nota: se puede mezclar sintaxis

# Selección de columa "Car" y "MPG"
# Método 1
print("Método 1")
df_Car_MPG = df.select(df.Car, df.MPG)
df_Car_MPG.show(10, truncate=False)

# Método 2
print("Método 2")
df_Car_MPG = df.select(df['Car'], df.MPG)
df_Car_MPG.show(10, truncate=False)

# Método 3
print("Método 3")
from pyspark.sql.functions import col
df_Car_MPG = df.select(col('car'), col('mpg'))
df_Car_MPG.show(10, truncate=False)

### 2. Añadiendo columnas: withColumn


In [None]:
# Caso 2: Añadiendo la columna "col_1"
from pyspark.sql.functions import lit
df_newCols = df.withColumn('col_1', lit(1))
# lit se usa para especificar el valor a usar para llenar la columna creada

# Caso 2: Añadiendo dos columnas
df_newCols = df_newCols.withColumn('col_2', lit(2))  \
                       .withColumn('col_3', lit(3))
df_newCols.show(10, truncate=False)

In [None]:
# Caso 3: Añadiendo una nueva columna a partir de una existente
# La nueva columna se llamará 'car_model' generada a partir de las columnas Car y Model
from pyspark.sql.functions import concat
df_newCols = df_newCols.withColumn('car_model', concat(col("Car"), lit(" "), col("Model")))

df_newCols.show(10, truncate=False)
print("# registros:", df_newCols.count())