# Curso Big Data #2 - SparkSession crear y leer DataFrames en PySpark

# Librerías

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

## SparkSession

La *SparkSession*, introducida en la versión Spark 2.0, es un punto de entrada a las funcionalidades de Spark. El objeto SparkSession de spark está disponible de forma predeterminada en spark-shell, tecleando en un terminal de nuestro pc **pyspark**:

```bash
C:\Users\errodringer\suscribete>pyspark
```

en la variable **spark**:

![spark-shell](images/spark-shell.PNG "Spark-Shell")  

Aunque *SparkContext* solía ser el punto de entrada antes de la versión 2.0 de Spark, no se reemplaza completamente con SparkSession, muchas características de SparkContext todavía están disponibles y se usan en Spark 2.0 y versiones posteriores. La SparkSession crea internamente SparkConfig y SparkContext con la configuración proporcionada.

### 1. Crear la SparkSession

In [3]:
spark = SparkSession.builder.appName('firstSession')\
    .config('spark.master', 'local[4]')\
    .config('spark.executor.memory', '1g')\
    .config("spark.sql.shuffle.partitions", 1)\
    .config('spark.driver.memory','1g')\
    .getOrCreate()

Todas las condifuraciones disponibles en Spark:

https://spark.apache.org/docs/latest/configuration.html

In [4]:
spark

Se puede obtener y cambiar la configuración inicial dada

In [5]:
spark.conf.get('spark.sql.shuffle.partitions')

'1'

In [6]:
spark.conf.set("spark.sql.shuffle.partitions", 2)

In [7]:
spark.conf.get('spark.sql.shuffle.partitions')

'2'

In [28]:
spark.sparkContext.getConf().getAll()

[('spark.sql.shuffle.partitions', '1'),
 ('spark.app.name', 'firstSession'),
 ('spark.driver.host', 'DESKTOP-1EE24UT'),
 ('spark.master', 'local[4]'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '1g'),
 ('spark.driver.port', '65081'),
 ('spark.app.id', 'local-1618732648653'),
 ('spark.sql.warehouse.dir',
  'file:/C:/Users/rodgo/errodringer/cursoBigData/02_PrimerosPasosConPySpark/code/spark-warehouse'),
 ('spark.executor.memory', '1g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1618732647355'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.default.parallelism', '4'),
 ('spark.ui.showConsoleProgress', 'true')]

Con el comando:

```python
spark.stop()
```

detenemos la aplicación. Lo veremos al final, ahora no interesa parar 😘

In [9]:
# spark.stop()

### 2. Crear tabla

### 2.1 A partir de una lista

In [9]:
columnas = ["id", "nombre", "l"]
lista = [(1, "Errodringer", "a"), (2, "Paco", "b"), (3, "Hola", "c"), (4, "Adios", "d")]
lista

[(1, 'Errodringer', 'a'),
 (2, 'Paco', 'b'),
 (3, 'Hola', 'c'),
 (4, 'Adios', 'd')]

In [10]:
df_1 = spark.createDataFrame(lista, schema=columnas)

Número total de registros (filas) 

In [11]:
df_1.count()

4

Mostramos *n* registros, indicando este parámetro como entrada en la función:

```
show(n)
```

In [12]:
df_1.show(2)

+---+-----------+---+
| id|     nombre|  l|
+---+-----------+---+
|  1|Errodringer|  a|
|  2|       Paco|  b|
+---+-----------+---+
only showing top 2 rows



Columnas del DataFrame

In [13]:
df_1.columns

['id', 'nombre', 'l']

Schema del DataFrame

In [14]:
df_1.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre: string (nullable = true)
 |-- l: string (nullable = true)



Resumen del DF

In [15]:
df_1.describe().show()

+-------+------------------+------+----+
|summary|                id|nombre|   l|
+-------+------------------+------+----+
|  count|                 4|     4|   4|
|   mean|               2.5|  null|null|
| stddev|1.2909944487358056|  null|null|
|    min|                 1| Adios|   a|
|    max|                 4|  Paco|   d|
+-------+------------------+------+----+



In [16]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema_1 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("l", StringType(), True)])

df_11 = spark.createDataFrame(lista, schema=schema_1)

In [17]:
df_11.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- l: string (nullable = true)



In [18]:
df_11.show()

+---+-----------+---+
| id|       name|  l|
+---+-----------+---+
|  1|Errodringer|  a|
|  2|       Paco|  b|
|  3|       Hola|  c|
|  4|      Adios|  d|
+---+-----------+---+



### 2.2 A partir de un csv

In [19]:
df = spark.read.csv('data/business.csv', sep=',', header=True)

In [20]:
df.show(5)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted| 

In [21]:
df.show(5, truncate=False)

+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|Series_reference|Period |Data_value|Suppressed|STATUS|UNITS  |Magnitude|Subject                       |Group                         |Series_title_1          |Series_title_2      |Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+------------------------------+------------------------------+------------------------+--------------------+--------------+--------------+--------------+
|BDCQ.SF1AA2CA   |2016.06|1116.386  |null      |F     |Dollars|6        |Business Data Collection - BDC|Industry by financial variable|Sales (operating income)|Forestry and Logging|Current prices|Unadjusted    |null          |
|BDCQ.SF1AA2CA   |2016.09|1070.874  |null      |F     |Dollars|6        |Business Data Colle

In [22]:
df.printSchema()

root
 |-- Series_reference: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Data_value: string (nullable = true)
 |-- Suppressed: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- UNITS: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Subject: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Series_title_1: string (nullable = true)
 |-- Series_title_2: string (nullable = true)
 |-- Series_title_3: string (nullable = true)
 |-- Series_title_4: string (nullable = true)
 |-- Series_title_5: string (nullable = true)



In [23]:
df.count()

1936

Escritura de un DataFrame. En este caso a ficheros tipo *parquet*, un formato de fichero optimizado para trabajar en entornos big data con grandes volumenes de datos.

In [25]:
df.write.parquet("parquet_example", mode='overwrite')

Se ha escrito en un único fichero porque el dataframe lo tenemos en una sola partición.

### 2.3 A partir de un parquet

In [48]:
df_p = spark.read.parquet('parquet_example')

In [49]:
df_p.count()

1936

In [50]:
df_p.show(2)

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|   BDCQ.SF1AA2CA|2016.06|  1116.386|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted|          null|
|   BDCQ.SF1AA2CA|2016.09|  1070.874|      null|     F|Dollars|        6|Business Data Col...|Industry by finan...|Sales (operating ...|Forestry and Logging|Current prices|    Unadjusted| 

In [57]:
df_p.describe().show()

+-------+----------------+------------------+------------------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|summary|Series_reference|            Period|        Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+-------+----------------+------------------+------------------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|  count|            1936|              1936|              1936|         0|  1936|   1936|     1936|                1936|                1936|                1936|                1936|          1936|          1936|             0|
|   mean|            null| 2018.217975206615|2704.3055568181853|      null|  nul

In [53]:
df_pandas = df_p.toPandas()

In [54]:
df_pandas.head()

Unnamed: 0,Series_reference,Period,Data_value,Suppressed,STATUS,UNITS,Magnitude,Subject,Group,Series_title_1,Series_title_2,Series_title_3,Series_title_4,Series_title_5
0,BDCQ.SF1AA2CA,2016.06,1116.386,,F,Dollars,6,Business Data Collection - BDC,Industry by financial variable,Sales (operating income),Forestry and Logging,Current prices,Unadjusted,
1,BDCQ.SF1AA2CA,2016.09,1070.874,,F,Dollars,6,Business Data Collection - BDC,Industry by financial variable,Sales (operating income),Forestry and Logging,Current prices,Unadjusted,
2,BDCQ.SF1AA2CA,2016.12,1054.408,,F,Dollars,6,Business Data Collection - BDC,Industry by financial variable,Sales (operating income),Forestry and Logging,Current prices,Unadjusted,
3,BDCQ.SF1AA2CA,2017.03,1010.665,,F,Dollars,6,Business Data Collection - BDC,Industry by financial variable,Sales (operating income),Forestry and Logging,Current prices,Unadjusted,
4,BDCQ.SF1AA2CA,2017.06,1233.7,,F,Dollars,6,Business Data Collection - BDC,Industry by financial variable,Sales (operating income),Forestry and Logging,Current prices,Unadjusted,


In [9]:
# spark.stop()