# Curso Big Data #2 - SparkSession crear y leer DataFrames en PySpark

# Librerías

In [None]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m888.1 kB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=db3163335d67e848b3ff6e7646cbfeb0665c8e0bfabe0849abd282c7fa864398
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

## SparkSession

La *SparkSession*, introducida en la versión Spark 2.0, es un punto de entrada a las funcionalidades de Spark. El objeto SparkSession de spark está disponible de forma predeterminada en spark-shell, tecleando en un terminal de nuestro pc **pyspark**:

```bash
C:\Users\errodringer\suscribete>pyspark
```

en la variable **spark**:

![spark-shell](images/spark-shell.PNG "Spark-Shell")  

Aunque *SparkContext* solía ser el punto de entrada antes de la versión 2.0 de Spark, no se reemplaza completamente con SparkSession, muchas características de SparkContext todavía están disponibles y se usan en Spark 2.0 y versiones posteriores. La SparkSession crea internamente SparkConfig y SparkContext con la configuración proporcionada.

### 1. Crear la SparkSession

In [None]:
spark = SparkSession.builder.appName('firstSession')\
    .config('spark.master', 'local[4]')\
    .config('spark.executor.memory', '1g')\
    .config("spark.sql.shuffle.partitions", 1)\
    .config('spark.driver.memory','1g')\
    .getOrCreate()

Todas las condifuraciones disponibles en Spark:

https://spark.apache.org/docs/latest/configuration.html

In [None]:
spark

Se puede obtener y cambiar la configuración inicial dada

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'1'

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", 2)

In [None]:
spark.conf.get('spark.sql.shuffle.partitions')

'2'

In [None]:
spark.sparkContext.getConf().getAll()

[('spark.sql.shuffle.partitions', '1'),
 ('spark.driver.port', '42581'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.app.name', 'firstSess

Con el comando:

```python
spark.stop()
```

detenemos la aplicación.

In [None]:
# spark.stop()

### 2. Crear tabla

### 2.1 A partir de una lista

In [None]:
columnas = ["id", "nombre", "l"]
lista = [(1, "Errodringer", "a"), (2, "Paco", "b"), (3, "Hola", "c"), (4, "Adios", "d")]
lista

[(1, 'Errodringer', 'a'),
 (2, 'Paco', 'b'),
 (3, 'Hola', 'c'),
 (4, 'Adios', 'd')]

In [None]:
df_1 = spark.createDataFrame(lista, schema=columnas)

Número total de registros (filas)

In [None]:
df_1.count()

4

Mostramos *n* registros, indicando este parámetro como entrada en la función:

```
show(n)
```

In [None]:
df_1.show(2)

+---+-----------+---+
| id|     nombre|  l|
+---+-----------+---+
|  1|Errodringer|  a|
|  2|       Paco|  b|
+---+-----------+---+
only showing top 2 rows



Columnas del DataFrame

In [None]:
df_1.columns

['id', 'nombre', 'l']

Schema del DataFrame

In [None]:
df_1.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- l: string (nullable = true)



Resumen del DF

In [None]:
df_1.describe().show()

+-------+------------------+------+----+
|summary|                id|nombre|   l|
+-------+------------------+------+----+
|  count|                 4|     4|   4|
|   mean|               2.5|  NULL|NULL|
| stddev|1.2909944487358056|  NULL|NULL|
|    min|                 1| Adios|   a|
|    max|                 4|  Paco|   d|
+-------+------------------+------+----+



In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema_1 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("l", StringType(), True)])

df_11 = spark.createDataFrame(lista, schema=schema_1)

In [None]:
df_11.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- l: string (nullable = true)



In [None]:
df_11.show()

+---+-----------+---+
| id|       name|  l|
+---+-----------+---+
|  1|Errodringer|  a|
|  2|       Paco|  b|
|  3|       Hola|  c|
|  4|      Adios|  d|
+---+-----------+---+



### 2.2 A partir de un csv

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = spark.read.csv('/content/drive/MyDrive/Infraestructura BigData/Tecmilenio.csv', sep=',', header=True)

In [None]:
df.show(5)

+---+-------+--------+----------+------------+
| id| nombre|apellido| matricula|calificacion|
+---+-------+--------+----------+------------+
|  1|   John|     Doe|AL02883894|         100|
|  2|  Alice|   Smith|AL02883895|          80|
|  3|Michael| Johnson|AL02883896|          85|
|  4|  Emily|   Brown|AL02883897|          70|
|  5|  David|   Davis|AL02883898|          90|
+---+-------+--------+----------+------------+
only showing top 5 rows



In [None]:
df.show(5, truncate=False)

+---+-------+--------+----------+------------+
|id |nombre |apellido|matricula |calificacion|
+---+-------+--------+----------+------------+
|1  |John   |Doe     |AL02883894|100         |
|2  |Alice  |Smith   |AL02883895|80          |
|3  |Michael|Johnson |AL02883896|85          |
|4  |Emily  |Brown   |AL02883897|70          |
|5  |David  |Davis   |AL02883898|90          |
+---+-------+--------+----------+------------+
only showing top 5 rows



In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- nombre: string (nullable = true)
 |-- apellido: string (nullable = true)
 |-- matricula: string (nullable = true)
 |-- calificacion: string (nullable = true)



In [None]:
df.count()

10

Escritura de un DataFrame. En este caso a ficheros tipo *parquet*, un formato de fichero optimizado para trabajar en entornos big data con grandes volumenes de datos.

In [None]:
df.write.parquet("parquet_example", mode='overwrite')

Se ha escrito en un único fichero porque el dataframe lo tenemos en una sola partición.

### 2.3 A partir de un parquet

In [None]:
df_p = spark.read.parquet('parquet_example')

In [None]:
df_p.count()

10

In [None]:
df_p.show(2)

+---+------+--------+----------+------------+
| id|nombre|apellido| matricula|calificacion|
+---+------+--------+----------+------------+
|  1|  John|     Doe|AL02883894|         100|
|  2| Alice|   Smith|AL02883895|          80|
+---+------+--------+----------+------------+
only showing top 2 rows



In [None]:
df_p.describe().show()

+-------+------------------+------+--------+----------+---------------+
|summary|                id|nombre|apellido| matricula|   calificacion|
+-------+------------------+------+--------+----------+---------------+
|  count|                10|    10|      10|        10|             10|
|   mean|               5.5|  NULL|    NULL|      NULL|           87.3|
| stddev|3.0276503540974917|  NULL|    NULL|      NULL|8.9820809269221|
|    min|                 1| Alice|Anderson|AL02883881|            100|
|    max|                 9| Sarah|  Wilson|AL02883899|             97|
+-------+------------------+------+--------+----------+---------------+



In [None]:
df_pandas = df_p.toPandas()

In [None]:
df_pandas.head()

Unnamed: 0,id,nombre,apellido,matricula,calificacion
0,1,John,Doe,AL02883894,100
1,2,Alice,Smith,AL02883895,80
2,3,Michael,Johnson,AL02883896,85
3,4,Emily,Brown,AL02883897,70
4,5,David,Davis,AL02883898,90


In [None]:
# spark.stop()

# Curso Big Data #3 - Transformaciones en columnas en PySpark

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

### 1. Crear la SparkSession

In [None]:
spark = SparkSession.builder.appName('columnsTransform').getOrCreate()

### 2. Lectura parquet

In [None]:
df = spark.read.parquet('parquet_example')

In [None]:
df.show(2)

In [None]:
df.printSchema()

### 3. Operaciones simples sobre columnas

#### 3.1 Selección de columnas

In [None]:
sel_cols = ['Series_title_1', 'Series_title_2', 'Series_title_3', 'Series_title_4', 'Series_title_5']
df_sel = df.select(sel_cols)

In [None]:
df_sel.show(5)

Otro método

In [None]:
df[sel_cols].show(5)

#### 3.2 Renombrar una columna

In [None]:
df_sel.withColumnRenamed('Series_title_1', 'ST_1').show(5)

In [None]:
df_sel.show(5)

#### 3.3 Ordenar DF

Ascendente

In [None]:
df.sort('Data_value')[['Data_value']].show(10)

Descendente

In [None]:
from pyspark.sql import functions as F

In [None]:
df.sort(F.desc("Data_value"))[['Data_value']].show(10)

#### 3.4 Cambiar el tipo de la columna

In [None]:
from pyspark.sql.types import DoubleType, IntegerType, StringType

Creamos una nueva columna con el tipo nuevo

In [None]:
df = df.withColumn('Data_value_int', F.col('Data_value').cast(DoubleType()))

In [None]:
df.show(5)

In [None]:
df.printSchema()

Ahora si podemos ordenar correctamente

In [None]:
df.sort('Data_value_int')[['Data_value_int']].show(10)

#### 3.5 Aplicar filtros

Simple

In [None]:
df_filter = df.filter(df.Data_value_int > 100)

In [None]:
df_filter.show(5)

In [None]:
df.count(), df_filter.count()

In [None]:
df_filter = df.filter('Data_value_int > 100')

In [None]:
df.count(), df_filter.count()

Varias columnas

In [None]:
df_filter = df.filter((df.Data_value_int > 100) & (df.STATUS == 'R'))

In [None]:
df_filter.show(5)

In [None]:
df.count(), df_filter.count()

In [None]:
df_filter = df.filter("Data_value_int > 100 and STATUS = 'R'")

In [None]:
df.count(), df_filter.count()

#### 3.6 Separación de una columna (split)

In [None]:
df_year = df\
.withColumn('Year', F.split('Period', '\.')[0].cast(IntegerType()))\
.withColumn('Month', F.split('Period', '\.')[1].cast(IntegerType()))

In [None]:
df_year.show(5)