# **PySpark (Batch) Test (ES)**
# Autor: Herrera, Franco Nahuel

## **Datasets**
En este workshop se van a utilizar dos datasets externos denominados [dat-ab-usos-2020.csv](https://datos.gob.ar/dataset/transporte-sube---cantidad-transacciones-usos-por-fecha/archivo/transporte_5de00d5f-2b36-491c-8660-426179f32e61) y [dat-ab-usos-2021.csv](https://datos.gob.ar/dataset/transporte-sube---cantidad-transacciones-usos-por-fecha/archivo/transporte_c01fc5e8-062f-424a-94c1-cfac1b33a129). Contienen información relacionada con las transacciones utilizando la tarjeta [SUBE(Sistema Único de Boleto Electrónico)](https://es.wikipedia.org/wiki/Sistema_%C3%9Anico_de_Boleto_Electr%C3%B3nico) en la República Argentina durante los años 2020 y 2021. Estas son las versiones sucias del conjunto de datos por lo que se deberá realizar una limpieza.

Los conjunto de datos contendrán la siguiente información:

Título de la columna|Tipo de dato|Descripción
---|---|---
dia_transporte|Fecha ISO-8601 (date)|día de transporte informado 
nombre_empresa|Texto (string)|nombre de la empresa de transporte 
linea| Texto (string)|descripción de la línea 
amba| Texto (string)|SI/NO 
tipo_transporte| Texto (string)|colectivo, tren, subte, lanchas  
jurisdiccion| Texto (string)|tipo de jurisdicción de la línea (NACIONAL, PROVINCIAL, MUNICIPAL): en caso de subte queda vacío 
provincia| Texto (string)|nombre de la provincia, en caso de ser jurisdicción provincial o municipal. Si es jurisdicción nacional figura JN. En caso de subte queda vacío 
municipio| Texto (string)|nombre del municipio, en caso de ser jurisdicción municipal. SI es jurisdicción nacional o provincial figura SD o SN respectivamente. En caso de subte queda vacío 
cantidad| Número entero (integer)|cantidad de transacciones de uso / check-in / checkout sin checkin / Venta de boletos, neteadas de eventuales reversas 
dato_preliminar| Texto (string)|SI/NO

También vamos a desarrollar manualmente dos dataframes:
- El primero va a representar el pago de subsidios al transporte de pasajeros en cada provincia de la República Argentina según la [resolución 276/2020](http://servicios.infoleg.gob.ar/infolegInternet/anexos/340000-344999/344443/norma.htm#:~:text=Que%2C%20posteriormente%2C%20se%20aprob%C3%B3%20la,QUINIENTOS%20MILLONES%20(%24%2010.500.000.000.) para el año 2021. Vamos a basarnos en la [tabla](http://servicios.infoleg.gob.ar/infolegInternet/anexos/340000-344999/344443/res196-3.jpg) presente en dicha resolución. También vamos a suponer que el estado nacional paga a cada provincia el día 10 de cada mes. Va a tener la siguiente estructura:

Título de la columna|Tipo de dato|Descripción
---|---|---
fecha_pago|Fecha ISO-8601 (date)|día de pago del subsidio 
provincia|Texto (string)|nombre de la provincia
total| Número entero (integer)|Monto total pagado

- El segundo va a representar la población de cada provincia de la República Argentina según el [censo 2010](https://es.wikipedia.org/wiki/Censo_argentino_de_2010). Va a tener la siguiente estructura:

Título de la columna|Tipo de dato|Descripción
---|---|---
provincia_nombre|Texto (string)|nombre de la provincia
poblacion_total|Número entero (integer)|población total.

## **Configuración de una sesión de PySpark**

Antes de que podamos empezar a procesar nuestros datos, necesitamos configurar una sesión de Pyspark para Google Colab. 

In [None]:
%%bash
apt-get update
apt-get install openjdk-8-jdk-headless -qq > /dev/null
wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
tar xf spark-2.3.1-bin-hadoop2.7.tgz
pip install -q findspark

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/u

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark_Test") \
    .getOrCreate()
sc = spark.sparkContext
sc

## **Carga de datos**

Primero se va a copiar los archivos de datos localmente. Utilizaremos `wget` y `head`, que normalmente son comandos del shell. En el entorno del notebook, podemos ejecutar cualquier comando de la shell utilizando el precursor `!`. El propósito de nuestros comandos de shell son los siguientes:

- `wget` - Es un cliente HTTP que descargará nuestros archivos de datos y los guardará localmente en nuestro entorno del notebook.
- `head` - Al igual que el comando `.head()` de Pandas, el comando del shell `head` imprime por defecto las 10 primeras líneas de un fichero. Puede especificar más o menos líneas si lo desea. 

Vamos a ejecutar la siguiente celda para sacar los archivos *sucios* localmente. Vamos a escribir los archivos en el directorio `/tmp` en el entorno del cuaderno.


In [None]:
!wget https://archivos-datos.transporte.gob.ar/upload/Dat_Ab_Usos/dat-ab-usos-2020.csv 

--2022-04-21 12:12:32--  https://archivos-datos.transporte.gob.ar/upload/Dat_Ab_Usos/dat-ab-usos-2020.csv
Resolving archivos-datos.transporte.gob.ar (archivos-datos.transporte.gob.ar)... 190.220.16.243
Connecting to archivos-datos.transporte.gob.ar (archivos-datos.transporte.gob.ar)|190.220.16.243|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 42244065 (40M) [text/csv]
Saving to: ‘dat-ab-usos-2020.csv’


2022-04-21 12:12:44 (3.81 MB/s) - ‘dat-ab-usos-2020.csv’ saved [42244065/42244065]



In [None]:
!wget https://archivos-datos.transporte.gob.ar/upload/Dat_Ab_Usos/dat-ab-usos-2021.csv

--2022-04-21 12:12:44--  https://archivos-datos.transporte.gob.ar/upload/Dat_Ab_Usos/dat-ab-usos-2021.csv
Resolving archivos-datos.transporte.gob.ar (archivos-datos.transporte.gob.ar)... 190.220.16.243
Connecting to archivos-datos.transporte.gob.ar (archivos-datos.transporte.gob.ar)|190.220.16.243|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49039779 (47M) [text/csv]
Saving to: ‘dat-ab-usos-2021.csv’


2022-04-21 12:13:01 (2.78 MB/s) - ‘dat-ab-usos-2021.csv’ saved [49039779/49039779]



In [None]:
!head dat-ab-usos-2020.csv
!echo "-------"
!head dat-ab-usos-2021.csv

DIA_TRANSPORTE,"NOMBRE_EMPRESA","LINEA","AMBA","TIPO_TRANSPORTE","JURISDICCION","PROVINCIA","MUNICIPIO",CANTIDAD,"DATO_PRELIMINAR"
2020-01-01,"EMPRESA BATAN S.A.","BS_AS_LINEA 715M","NO","COLECTIVO","MUNICIPAL","BUENOS AIRES","GENERAL PUEYRREDON",2154,"NO"
2020-01-01,"COMPAÑIA  DE TRANSPORTE VECINAL S.A.","BS_AS_LINEA_326","SI","COLECTIVO","PROVINCIAL","BUENOS AIRES","SN",1492,"NO"
2020-01-01,"EMPRESA DE TRANSPORTE PERALTA RAMOS SACI","BS_AS_LINEA_512","NO","COLECTIVO","MUNICIPAL","BUENOS AIRES","GENERAL PUEYRREDON",1889,"NO"
2020-01-01,"AUTOBUSES BUENOS AIRES S.R.L. – TRANSPORTE LARRAZABAL C.I.S.A. – UNION TRANSITORIA (UT)","BS_AS_LINEA_514","SI","COLECTIVO","MUNICIPAL","BUENOS AIRES","ALMIRANTE BROWN",4669,"NO"
2020-01-01,"EL URBANO SRL","BS_AS_LINEA_522","SI","COLECTIVO","MUNICIPAL","BUENOS AIRES","LANUS",187,"NO"
2020-01-01,"EL URBANO SRL","BS_AS_LINEA_527","SI","COLECTIVO","MUNICIPAL","BUENOS AIRES","LANUS",543,"NO"
2020-01-01,"TRANSPORTES LINEA 123 S.A.C.I.","BS_ASLINEA_12

In [None]:
import os
base_dir = os.path.join('./')
sube_2020_filename = os.path.join(base_dir, 'dat-ab-usos-2020.csv')
sube_2021_filename = os.path.join(base_dir, 'dat-ab-usos-2021.csv')

## **Crear los DataFrames**

Veamos qué hace Spark con nuestros datos y si puede analizar correctamente la salida. Para ello, primero cargaremos el contenido en un DataFrame utilizando el método `spark.read.csv()`. 

Pasaremos tres argumentos:

 - La ruta del archivo(s)
 - Una entrada para `header=True`. Nuestros archivos tienen una fila de cabecera, así que debemos especificar eso.
 -El segundo argumento es `inferSchema=True`, que sirve para que Spark infiera el tipo de dato de las columnas.
 - El último argumento que añadimos es la opción `delimiter`, que especifica el separador de campos. A menudo, en los archivos CSV es una coma (`,`).

Dependiendo de lo familiarizado que el lector esté con Spark, se debe tener en cuenta que esta línea no lee realmente el contenido de los archivos todavía. Este comando es *perezoso*, lo que significa que no se ejecuta hasta que solicitemos a Spark que ejecute algún tipo de acción. Este es un punto crucial para entender cómo se comporta el procesamiento de datos dentro de Spark. 

In [None]:
df_raw_2020 = spark.read.options(header=True,inferSchema=True,delimiter=",").csv(sube_2020_filename)
df_raw_2021 = spark.read.options(header=True,inferSchema=True,delimiter=",").csv(sube_2021_filename)

Vamos a proceder a crear el dataframe de los subsidios para el transporte para cada provincia.

Vamos a utilizar los tipos de datos de la librería `pyspark.sql.types` para asignar formato acorde a las columnas. Vamos a agregar cada registro ingresándolos como un diccionario. Luego creamos el dataframe con la función `.createDataFrame()`

In [None]:
from pyspark.sql.types import Row, StructType, StructField,IntegerType,StringType,DateType
from datetime import datetime
import pyspark.sql.functions as F

data_subsidio_schema = [StructField("FECHA_PAGO",DateType()),StructField("PROVINCIA",StringType(),True),
StructField("TOTAL",IntegerType(),True)]

data_subsidio_final_struct = StructType(fields=data_subsidio_schema)

data_subsidio_elements = [

{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'BUENOS AIRES','TOTAL':113585263},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'MENDOZA','TOTAL':181728248},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SAN JUAN','TOTAL':53808599},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'JUJUY','TOTAL':62841126},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'NEUQUÉN','TOTAL':30166633},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SANTA FE','TOTAL':226238771},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'ENTRE RÍOS','TOTAL':51739340},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'RÍO NEGRO','TOTAL':22263646},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'CHUBUT','TOTAL':31456364},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'CHACO','TOTAL':37540408},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'CATAMARCA','TOTAL':18657491},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SAN LUIS','TOTAL':25417728},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'FORMOSA','TOTAL':9992351},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'CORRIENTES','TOTAL':37773991},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'LA PAMPA','TOTAL':5517335},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'TIERRA DEL FUEGO','TOTAL':2889858},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'CORDOBA','TOTAL':295212638},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'TUCUMÁN','TOTAL':152595198},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SALTA','TOTAL':98314988},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SANTA CRUZ','TOTAL':3243681},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'LA RIOJA','TOTAL':11594251},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'SANTIAGO DEL ESTERO','TOTAL':51541832},
{'FECHA_PAGO':datetime(2021,1,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,2,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,3,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,4,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,5,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,6,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,7,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,8,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,9,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,10,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,11,10),'PROVINCIA':'MISIONES','TOTAL':79100451},
{'FECHA_PAGO':datetime(2021,12,10),'PROVINCIA':'MISIONES','TOTAL':79100451}]

In [None]:
df_subsidio_2021= spark.createDataFrame(data_subsidio_elements,
                                          data_subsidio_final_struct)

In [None]:
df_subsidio_2021.show()

+----------+------------+---------+
|FECHA_PAGO|   PROVINCIA|    TOTAL|
+----------+------------+---------+
|2021-01-10|BUENOS AIRES|113585263|
|2021-02-10|BUENOS AIRES|113585263|
|2021-03-10|BUENOS AIRES|113585263|
|2021-04-10|BUENOS AIRES|113585263|
|2021-05-10|BUENOS AIRES|113585263|
|2021-06-10|BUENOS AIRES|113585263|
|2021-07-10|BUENOS AIRES|113585263|
|2021-08-10|BUENOS AIRES|113585263|
|2021-09-10|BUENOS AIRES|113585263|
|2021-10-10|BUENOS AIRES|113585263|
|2021-11-10|BUENOS AIRES|113585263|
|2021-12-10|BUENOS AIRES|113585263|
|2021-01-10|     MENDOZA|181728248|
|2021-02-10|     MENDOZA|181728248|
|2021-03-10|     MENDOZA|181728248|
|2021-04-10|     MENDOZA|181728248|
|2021-05-10|     MENDOZA|181728248|
|2021-06-10|     MENDOZA|181728248|
|2021-07-10|     MENDOZA|181728248|
|2021-08-10|     MENDOZA|181728248|
+----------+------------+---------+
only showing top 20 rows



También vamos a crear el dataframe de la población de cada provincia.

In [None]:
data_poblacion_schema = [StructField("PROVINCIA_NOMBRE",StringType(),True),
StructField("POBLACION_TOTAL",IntegerType(),True)]

data_poblacion_final_struct = StructType(fields=data_poblacion_schema)

data_poblacion_elements = [

{'PROVINCIA_NOMBRE':'BUENOS AIRES','POBLACION_TOTAL':15625083},
{'PROVINCIA_NOMBRE':'MENDOZA','POBLACION_TOTAL':1738929},
{'PROVINCIA_NOMBRE':'SAN JUAN','POBLACION_TOTAL':681055},
{'PROVINCIA_NOMBRE':'JUJUY','POBLACION_TOTAL':673307},
{'PROVINCIA_NOMBRE':'NEUQUÉN','POBLACION_TOTAL':551266},
{'PROVINCIA_NOMBRE':'SANTA FE','POBLACION_TOTAL':3194537},
{'PROVINCIA_NOMBRE':'ENTRE RÍOS','POBLACION_TOTAL':1235994},
{'PROVINCIA_NOMBRE':'RÍO NEGRO','POBLACION_TOTAL':638645},
{'PROVINCIA_NOMBRE':'CHUBUT','POBLACION_TOTAL':509108},
{'PROVINCIA_NOMBRE':'CHACO','POBLACION_TOTAL':1055259},
{'PROVINCIA_NOMBRE':'CATAMARCA','POBLACION_TOTAL':367828},
{'PROVINCIA_NOMBRE':'SAN LUIS','POBLACION_TOTAL':432310},
{'PROVINCIA_NOMBRE':'FORMOSA','POBLACION_TOTAL':530162},
{'PROVINCIA_NOMBRE':'CORRIENTES','POBLACION_TOTAL':992595},
{'PROVINCIA_NOMBRE':'LA PAMPA','POBLACION_TOTAL':318951},
{'PROVINCIA_NOMBRE':'TIERRA DEL FUEGO','POBLACION_TOTAL':127205},
{'PROVINCIA_NOMBRE':'CORDOBA','POBLACION_TOTAL':3308876},
{'PROVINCIA_NOMBRE':'TUCUMÁN','POBLACION_TOTAL':1448188},
{'PROVINCIA_NOMBRE':'SALTA','POBLACION_TOTAL':1214441},
{'PROVINCIA_NOMBRE':'SANTA CRUZ','POBLACION_TOTAL':273964},
{'PROVINCIA_NOMBRE':'LA RIOJA','POBLACION_TOTAL':333642},
{'PROVINCIA_NOMBRE':'SANTIAGO DEL ESTERO','POBLACION_TOTAL':874006},
{'PROVINCIA_NOMBRE':'MISIONES','POBLACION_TOTAL':1101593}                       
]

In [None]:
df_poblacion_2010 = spark.createDataFrame(data_poblacion_elements,
                                          data_poblacion_final_struct)

In [None]:
df_poblacion_2010.show()

+----------------+---------------+
|PROVINCIA_NOMBRE|POBLACION_TOTAL|
+----------------+---------------+
|    BUENOS AIRES|       15625083|
|         MENDOZA|        1738929|
|        SAN JUAN|         681055|
|           JUJUY|         673307|
|         NEUQUÉN|         551266|
|        SANTA FE|        3194537|
|      ENTRE RÍOS|        1235994|
|       RÍO NEGRO|         638645|
|          CHUBUT|         509108|
|           CHACO|        1055259|
|       CATAMARCA|         367828|
|        SAN LUIS|         432310|
|         FORMOSA|         530162|
|      CORRIENTES|         992595|
|        LA PAMPA|         318951|
|TIERRA DEL FUEGO|         127205|
|         CORDOBA|        3308876|
|         TUCUMÁN|        1448188|
|           SALTA|        1214441|
|      SANTA CRUZ|         273964|
+----------------+---------------+
only showing top 20 rows



## **Análisis inicial**

Vamos a ver las primeras 20 filas utilizando el método `.show()` en los DataFrames. Pasaremos los parámetros:

- El número de filas a mostrar (`20`)

*Nota: `.show()` es una acción de Spark, lo que significa que cualquier comando perezoso anterior se ejecutará antes de la acción real. Spark también puede optimizar estos comandos para obtener el mejor rendimiento según sea necesario.* 

In [None]:
df_raw_2020.show(20)

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2020-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    2154|             NO|
|2020-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|    1492|             NO|
|2020-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1889|             NO|
|2020-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

In [None]:
df_raw_2021.show(20)

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2021-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO|
|2021-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO|
|2021-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO|
|2021-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

Podemos utilizar el método `.printSchema()` para imprimir el esquema inferido asociado a los datos. Observe que tenemos 10 columnas (lo que se espera en base a nuestra información de formato) y cada campo es anulable. 
*`.printSchema()` es también una acción de Spark.*

In [None]:
df_raw_2020.printSchema()

root
 |-- DIA_TRANSPORTE: timestamp (nullable = true)
 |-- NOMBRE_EMPRESA: string (nullable = true)
 |-- LINEA: string (nullable = true)
 |-- AMBA: string (nullable = true)
 |-- TIPO_TRANSPORTE: string (nullable = true)
 |-- JURISDICCION: string (nullable = true)
 |-- PROVINCIA: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- CANTIDAD: integer (nullable = true)
 |-- DATO_PRELIMINAR: string (nullable = true)



In [None]:
df_raw_2021.printSchema()

root
 |-- DIA_TRANSPORTE: timestamp (nullable = true)
 |-- NOMBRE_EMPRESA: string (nullable = true)
 |-- LINEA: string (nullable = true)
 |-- AMBA: string (nullable = true)
 |-- TIPO_TRANSPORTE: string (nullable = true)
 |-- JURISDICCION: string (nullable = true)
 |-- PROVINCIA: string (nullable = true)
 |-- MUNICIPIO: string (nullable = true)
 |-- CANTIDAD: integer (nullable = true)
 |-- DATO_PRELIMINAR: string (nullable = true)



Vamos a ejecutar un método `.count()` en nuestros dataframes para determinar cuántas filas están presentes en cada conjunto de datos, independientemente de si son correctas.

In [None]:
raw_count_2020 = df_raw_2020.count()
print(raw_count_2020)

356628


In [None]:
raw_count_2021 = df_raw_2021.count()
print(raw_count_2021)

410644


Podemos observar que en el dataset crudo del 2020 tenemos 356628 registros, mientras que en el otro contamos con 410644 filas.

## **Limpieza de datos**

Utilizaremos algunos métodos del módulo `pyspark.sql.functions` para ayudarnos a determinar los valores nulos o faltantes. 

Vamos a utilizar el método `.count()` para contar la cantidad de observaciónes nulas en cada columna de nuestros dataframes. Para ello utilizamos las funciónes `.when()` y `.col()`. También utilizaremos la función `.isNull()` para encontrar sólo las entradas que son nulas. Esto se pasa al método `.select()` en el marco de datos para devolver sólo las filas que cumplen este requisito. Finalmente ejecutamos el método `.show()` para poder visualizar el dataframe resultante con el conteo total de los valores nulos por cada columna.

Un ejemplo para realizar este conteo en un dataframe de Spark llamado `df_1` sería:

```
from pyspark.sql.functions import when, count, col
df_1.select([count(when(col(c).isNull(), c)).alias(c) for c in df_1.columns]).show()
```

In [None]:
from pyspark.sql.functions import when, count, col
df_raw_2020.select([count(when(col(c).isNull(), c)).alias(c) for c in df_raw_2020.columns]).show()

+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+
|DIA_TRANSPORTE|NOMBRE_EMPRESA|LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|PROVINCIA|MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+
|             0|             0|    0|   0|              0|        2561|     2561|     2561|       0|              0|
+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+



In [None]:
df_raw_2021.select([count(when(col(c).isNull(), c)).alias(c) for c in df_raw_2021.columns]).show()

+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+
|DIA_TRANSPORTE|NOMBRE_EMPRESA|LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|PROVINCIA|MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+
|             0|             0|    0|   0|              0|        2974|     2974|     2974|       0|              0|
+--------------+--------------+-----+----+---------------+------------+---------+---------+--------+---------------+



Podemos ver que el dataset crudo del año 2020 cuenta con 2561 valores nulos. Mientras que  el dataset crudo del año 2021 cuenta con 2974 valores nulos. Ambos en las columnas `JURISDICCIÓN`,`PROVINCIA` y `MUNICIPIO`.

Para eliminar las observaciónes nulas debemos utilizar el método `.dropna()` que devuelve un nuevo DataFrame omitiendo las filas con valores nulos.
Finalmente ejecutamos el método `.show()` para poder visualizar el dataframe resultante.

In [None]:
df_clean_2020 = df_raw_2020.dropna()
df_clean_2020.show()

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2020-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    2154|             NO|
|2020-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|    1492|             NO|
|2020-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1889|             NO|
|2020-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

In [None]:
df_clean_2021 = df_raw_2021.dropna()
df_clean_2021.show()

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2021-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO|
|2021-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO|
|2021-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO|
|2021-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

Podemos utilizar la función `.describe()` para calcular las estadísticas básicas de la columna numérica `CANTIDAD`. Luego emplear el método `.show()` para ver el dataframe resultante.

In [None]:
df_clean_2020.select("CANTIDAD").describe().show()

+-------+------------------+
|summary|          CANTIDAD|
+-------+------------------+
|  count|            354067|
|   mean| 4853.705801444359|
| stddev|12428.229703937393|
|    min|               -43|
|    max|            603766|
+-------+------------------+



In [None]:
df_clean_2021.select("CANTIDAD").describe().show()

+-------+------------------+
|summary|          CANTIDAD|
+-------+------------------+
|  count|            407670|
|   mean| 6655.097257585792|
| stddev|14231.603960343082|
|    min|               -15|
|    max|            477857|
+-------+------------------+



Podemos observar que ambas tienen como mínimo valores negativos, el dataset del 2020 tiene el valor mínimo de -43 y el dataset del 2021 tiene como mínimo el valor -15. Si bien es cierto que si el usuario no tiene saldo, puede viajar con saldo negativo pero no vamos a considerarlo y podriamos realizar un filtrado teniendo en cuenta los valores mayores a 0 en la columna `CANTIDAD`.
Otros filtrados que se va a realizar son:
- Solo considerar el medio de transporte COLECTIVO en la columna `TIPO_DE_TRANSPORTE`. Esto para estudiar el uso de la tarjeta SUBE en este medio de transporte.
- No considerar la jurisdicción NACIONAL en la columna `JURISDICCION`. Se toma esta decisión porqué no se sabría el nombre exacto de la provincia en cuestión.

Luego, utilizamos el método `.show()` para observar el dataframe resultante.

In [None]:
df_final_2020 = df_clean_2020.filter((df_clean_2020["CANTIDAD"]>0) 
& (df_clean_2020["TIPO_TRANSPORTE"]=="COLECTIVO")
& (df_clean_2020["JURISDICCION"]!="NACIONAL"))
df_final_2020.show()

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2020-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    2154|             NO|
|2020-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|    1492|             NO|
|2020-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1889|             NO|
|2020-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

In [None]:
df_final_2021 = df_clean_2021.filter((df_clean_2021["CANTIDAD"]>0) 
& (df_clean_2021["TIPO_TRANSPORTE"]=="COLECTIVO")
& (df_clean_2021["JURISDICCION"]!="NACIONAL"))
df_final_2021.show()

+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|     DIA_TRANSPORTE|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|
+-------------------+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+
|2021-01-01 00:00:00|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO|
|2021-01-01 00:00:00|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO|
|2021-01-01 00:00:00|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO|
|2021-01-01 00:00:00|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|B

Empleamos el método `.count()` para conocer las observaciónes que quedaron en los dataframes luego de la limpieza y el filtrado.

In [None]:
final_count_2020 = df_final_2020.count()
print(final_count_2020)

295403


In [None]:
final_count_2021 = df_final_2021.count()
print(final_count_2021)

346943


In [None]:
lost_data_2020 = ((raw_count_2020-final_count_2020)/(raw_count_2020)) * 100
print(lost_data_2020)

17.16774902699732


In [None]:
lost_data_2021 = ((raw_count_2021-final_count_2021)/(raw_count_2021)) * 100
print(lost_data_2021)

15.512463350249853


Podemos ver que en el dataframe del 2020 se perdieron el 17.16% de los valores y en el del 2021 perdimos el 15.51%. Por lo que no se llegó a descartar más del 70% de los datos.

Nuestro siguiente paso para limpiar este conjunto de datos en Pyspark consiste en determinar cuántas columnas deberíamos tener en nuestro conjunto de datos y ocuparnos de los valores atípicos. Sabemos por ejemplo que la columna `DIA_TRANSPORTE` fue inferida por Spark de tipo `Timestamp` cuando debería haber sido `DateType`.

Vamos a proceder a separar la fecha de la hora y luego añadirla en otra columna. Para ello vamos a utilizar:

- `F.split()`: Esto actúa de forma similar al método `split()` de Python, dividiendo el contenido de una columna del dataframe en un carácter especificado en una columna ArrayType de Spark (es decir, la versión de Spark de una variable de lista). 
- `.withColumn()`: Crea un nuevo marco de datos con una columna dada.

In [None]:
from pyspark.sql import functions as F
date_split = F.split(df_final_2020["DIA_TRANSPORTE"],' ')

df_2020 = df_final_2020.withColumn("FECHA_TRANSPORTE",date_split.getItem(0))

df_2020 = df_2020.drop("DIA_TRANSPORTE")

In [None]:
df_2020.show()

+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+----------------+
|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|FECHA_TRANSPORTE|
+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+----------------+
|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    2154|             NO|      2020-01-01|
|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|    1492|             NO|      2020-01-01|
|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1889|             NO|      2020-01-01|
|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|   ALMIRANTE BROWN|    466

In [None]:
date_split = F.split(df_final_2021["DIA_TRANSPORTE"],' ')

df_2021 = df_final_2021.withColumn("FECHA_TRANSPORTE",date_split.getItem(0))

df_2021 = df_2021.drop("DIA_TRANSPORTE")

In [None]:
df_2021.show()

+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+----------------+
|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|FECHA_TRANSPORTE|
+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+----------------+
|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO|      2021-01-01|
|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO|      2021-01-01|
|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO|      2021-01-01|
|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|   ALMIRANTE BROWN|    281

Lo siguiente que vamos a hacer es trabajar con las columnas de fechas en los datasets que contengan fecha en formato `yyyy-mm-dd`. Vamos a crear 3 columnas nuevas, una para el año, otra para el mes y otra para el día.

In [None]:
date_split_2020 = F.split(df_2020["FECHA_TRANSPORTE"],'-')

final_df_2020 = df_2020.withColumn("Dia",date_split_2020.getItem(2))
final_df_2020 = final_df_2020.withColumn("Mes",date_split_2020.getItem(1))
final_df_2020 = final_df_2020.withColumn("Año",date_split_2020.getItem(0))

final_df_2020 = final_df_2020.drop("FECHA_TRANSPORTE")

In [None]:
final_df_2020.show()

+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|Dia|Mes| Año|
+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    2154|             NO| 01| 01|2020|
|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|    1492|             NO| 01| 01|2020|
|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1889|             NO| 01| 01|2020|
|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|   ALMIRANTE BROWN|    4669|             NO| 01| 0

In [None]:
date_split_2021 = F.split(df_2021["FECHA_TRANSPORTE"],'-')

final_df_2021 = df_2021.withColumn("Dia",date_split_2021.getItem(2))
final_df_2021 = final_df_2021.withColumn("Mes",date_split_2021.getItem(1))
final_df_2021 = final_df_2021.withColumn("Año",date_split_2021.getItem(0))

final_df_2021 = final_df_2021.drop("FECHA_TRANSPORTE")

In [None]:
final_df_2021.show()

+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|Dia|Mes| Año|
+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO| 01| 01|2021|
|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO| 01| 01|2021|
|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO| 01| 01|2021|
|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|   ALMIRANTE BROWN|    2813|             NO| 01| 0

In [None]:
date_split_sub_2021 = F.split(df_subsidio_2021["FECHA_PAGO"],'-')

final_df_sub_2021 = df_subsidio_2021.withColumn("Dia",date_split_sub_2021.getItem(2))
final_df_sub_2021 = final_df_sub_2021.withColumn("Mes",date_split_sub_2021.getItem(1))
final_df_sub_2021 = final_df_sub_2021.withColumn("Año",date_split_sub_2021.getItem(0))

final_df_sub_2021 = final_df_sub_2021.drop("FECHA_PAGO")

In [None]:
final_df_sub_2021.show()

+------------+---------+---+---+----+
|   PROVINCIA|    TOTAL|Dia|Mes| Año|
+------------+---------+---+---+----+
|BUENOS AIRES|113585263| 10| 01|2021|
|BUENOS AIRES|113585263| 10| 02|2021|
|BUENOS AIRES|113585263| 10| 03|2021|
|BUENOS AIRES|113585263| 10| 04|2021|
|BUENOS AIRES|113585263| 10| 05|2021|
|BUENOS AIRES|113585263| 10| 06|2021|
|BUENOS AIRES|113585263| 10| 07|2021|
|BUENOS AIRES|113585263| 10| 08|2021|
|BUENOS AIRES|113585263| 10| 09|2021|
|BUENOS AIRES|113585263| 10| 10|2021|
|BUENOS AIRES|113585263| 10| 11|2021|
|BUENOS AIRES|113585263| 10| 12|2021|
|     MENDOZA|181728248| 10| 01|2021|
|     MENDOZA|181728248| 10| 02|2021|
|     MENDOZA|181728248| 10| 03|2021|
|     MENDOZA|181728248| 10| 04|2021|
|     MENDOZA|181728248| 10| 05|2021|
|     MENDOZA|181728248| 10| 06|2021|
|     MENDOZA|181728248| 10| 07|2021|
|     MENDOZA|181728248| 10| 08|2021|
+------------+---------+---+---+----+
only showing top 20 rows



## **Agregar valor a los datos**

Los datos por sí solos no nos dicen nada, debemos trabajar con ellos para tomar decisiones. Es por eso que podemos realizar algunas consultas similares a SQL gracias a la librería `pyspark.sql`

### Total de transacciones por Provincia, Municipio y Fecha en 2020

Para conocer esto debemos agrupar el dataframe con las transacciones del 2020 por las columnas `PROVINCIA`,`MUNICIPIO`,`DIA`,`MES` y `AÑO`.
Luego sumamos todas las transacciones de la columna `CANTIDAD`, a esta columna con la suma la renombramos como `TRANSACCIONES` y ordenamos de mayor a menor por dicha columna.

In [None]:
trans_provincia_2020_df =(final_df_2020 \
  .groupBy('PROVINCIA','MUNICIPIO','DIA','MES','AÑO') \
  .sum('CANTIDAD')\
  .sort('sum(CANTIDAD)',ascending=False)
  )
trans_provincia_2020_df = trans_provincia_2020_df.withColumnRenamed("sum(CANTIDAD)","TRANSACCIONES")
trans_provincia_2020_df.show()

+------------+---------+---+---+----+-------------+
|   PROVINCIA|MUNICIPIO|DIA|MES| AÑO|TRANSACCIONES|
+------------+---------+---+---+----+-------------+
|BUENOS AIRES|       SN| 13| 03|2020|      3396595|
|BUENOS AIRES|       SN| 12| 03|2020|      3395057|
|BUENOS AIRES|       SN| 10| 03|2020|      3315328|
|BUENOS AIRES|       SN| 06| 03|2020|      3308267|
|BUENOS AIRES|       SN| 05| 03|2020|      3181077|
|BUENOS AIRES|       SN| 09| 03|2020|      3109171|
|BUENOS AIRES|       SN| 04| 03|2020|      3048847|
|BUENOS AIRES|       SN| 03| 03|2020|      3037490|
|BUENOS AIRES|       SN| 02| 03|2020|      3014187|
|BUENOS AIRES|       SN| 28| 02|2020|      2939814|
|BUENOS AIRES|       SN| 21| 02|2020|      2931693|
|BUENOS AIRES|       SN| 19| 02|2020|      2897820|
|BUENOS AIRES|       SN| 14| 02|2020|      2893471|
|BUENOS AIRES|       SN| 26| 02|2020|      2883380|
|BUENOS AIRES|       SN| 20| 02|2020|      2839463|
|BUENOS AIRES|       SN| 12| 02|2020|      2830251|
|BUENOS AIRE

### Total de transacciones por Provincia, Municipio y Fecha en 2021

Procedemos igual que con el dataframe del 2020, agrupamos por las columnas `PROVINCIA`,`MUNICIPIO`,`DIA`,`MES` y `AÑO`.
También sumamos todas las transacciones de la columna `CANTIDAD`, y la renombramos como `TRANSACCIONES` y ordenamos de mayor a menor por dicha columna.

In [None]:
trans_provincia_2021_df =(final_df_2021 \
  .groupBy('PROVINCIA','MUNICIPIO','DIA','MES','AÑO') \
  .sum('CANTIDAD')\
  .sort('sum(CANTIDAD)',ascending=False)
  )
trans_provincia_2021_df = trans_provincia_2021_df.withColumnRenamed("sum(CANTIDAD)","TRANSACCIONES")
trans_provincia_2021_df.show()

+------------+---------+---+---+----+-------------+
|   PROVINCIA|MUNICIPIO|DIA|MES| AÑO|TRANSACCIONES|
+------------+---------+---+---+----+-------------+
|BUENOS AIRES|       SN| 12| 11|2021|      3110362|
|BUENOS AIRES|       SN| 19| 11|2021|      3108702|
|BUENOS AIRES|       SN| 05| 11|2021|      3096851|
|BUENOS AIRES|       SN| 07| 12|2021|      3087722|
|BUENOS AIRES|       SN| 17| 11|2021|      3076021|
|BUENOS AIRES|       SN| 10| 11|2021|      3074143|
|BUENOS AIRES|       SN| 03| 12|2021|      3074024|
|BUENOS AIRES|       SN| 18| 11|2021|      3048279|
|BUENOS AIRES|       SN| 26| 11|2021|      3044062|
|BUENOS AIRES|       SN| 11| 11|2021|      3043134|
|BUENOS AIRES|       SN| 09| 11|2021|      3019042|
|BUENOS AIRES|       SN| 15| 10|2021|      3007216|
|BUENOS AIRES|       SN| 09| 12|2021|      2993182|
|BUENOS AIRES|       SN| 17| 12|2021|      2987174|
|BUENOS AIRES|       SN| 22| 12|2021|      2970460|
|BUENOS AIRES|       SN| 04| 11|2021|      2964840|
|BUENOS AIRE

### Total de subsidios por provincia en 2021

Si quisieramos conocer el total de subsidios al transporte recibidos durante el 2021, podemos agrupar por la columna `PROVINCIA` y realizar la sumatoria de la columna `TOTAL`. Luego la renombramos como `TOTAL`. Para establecer un ranking, ordenamos de mayor a menor.

In [None]:
total_subsidios_df =(final_df_sub_2021 \
  .groupBy('PROVINCIA') \
  .sum('TOTAL')\
  .sort('sum(TOTAL)',ascending=False)
  )
total_subsidios_df = total_subsidios_df.withColumnRenamed("sum(TOTAL)","TOTAL")
total_subsidios_df.show(total_subsidios_df.count())

+-------------------+----------+
|          PROVINCIA|     TOTAL|
+-------------------+----------+
|            CORDOBA|3542551656|
|           SANTA FE|2714865252|
|            MENDOZA|2180738976|
|            TUCUMÁN|1831142376|
|       BUENOS AIRES|1363023156|
|              SALTA|1179779856|
|           MISIONES| 949205412|
|              JUJUY| 754093512|
|           SAN JUAN| 645703188|
|         ENTRE RÍOS| 620872080|
|SANTIAGO DEL ESTERO| 618501984|
|         CORRIENTES| 453287892|
|              CHACO| 450484896|
|             CHUBUT| 377476368|
|            NEUQUÉN| 361999596|
|           SAN LUIS| 305012736|
|          RÍO NEGRO| 267163752|
|          CATAMARCA| 223889892|
|           LA RIOJA| 139131012|
|            FORMOSA| 119908212|
|           LA PAMPA|  66208020|
|         SANTA CRUZ|  38924172|
|   TIERRA DEL FUEGO|  34678296|
+-------------------+----------+



Podemos observar que `CORDOBA` es la provincia que mayor dinero recibió en concepto de subsidios al transporte público de pasajeros.

### Top de subsidios por habitante por provincia en 2021

Si necesitamos conocer el monto que cada provincia subsidia en transporte por cada habitante. Debemos realizar una operación `.join()`, entre los dataframes `total_subsidios_df` calculado en el paso anterior con el dataframe `df_poblacion_2010` que cuenta con la población de cada provincia según el censo 2010. Los parametros para el join serán las columnas `PROVINCIA` y `PROVINCIA_NOMBRE` de cada dataframe y el parametro `inner` ya que queremos realizar un inner join.
Para calcular el subsidio por habitante, dividimos la columna `TOTAL` con la columna `POBLACION_TOTAL`, a esa columna resultante la renombramos como `SUBSIDIO_POR_HABITANTE`. Luego ordenamos el dataframe de mayor a menor por dicha columna.

In [None]:
df_sub_x_hab =(total_subsidios_df \
  .join(df_poblacion_2010,(total_subsidios_df["PROVINCIA"]==df_poblacion_2010["PROVINCIA_NOMBRE"]),'inner'))

df_sub_x_hab = df_sub_x_hab.withColumn("SUBSIDIO_POR_HABITANTE",df_sub_x_hab["TOTAL"]/df_sub_x_hab["POBLACION_TOTAL"])

df_sub_x_hab = df_sub_x_hab.select("PROVINCIA","TOTAL","POBLACION_TOTAL","SUBSIDIO_POR_HABITANTE").sort("SUBSIDIO_POR_HABITANTE",ascending=False)
df_sub_x_hab.show(25)

+-------------------+----------+---------------+----------------------+
|          PROVINCIA|     TOTAL|POBLACION_TOTAL|SUBSIDIO_POR_HABITANTE|
+-------------------+----------+---------------+----------------------+
|            TUCUMÁN|1831142376|        1448188|    1264.4369211732178|
|            MENDOZA|2180738976|        1738929|     1254.070163876731|
|              JUJUY| 754093512|         673307|    1119.9846607862387|
|            CORDOBA|3542551656|        3308876|    1070.6208561457124|
|              SALTA|1179779856|        1214441|     971.4591783380173|
|           SAN JUAN| 645703188|         681055|     948.0925740211877|
|           MISIONES| 949205412|        1101593|     861.6661616404607|
|           SANTA FE|2714865252|        3194537|     849.8462381246484|
|             CHUBUT| 377476368|         509108|     741.4465457231079|
|SANTIAGO DEL ESTERO| 618501984|         874006|     707.6633158124772|
|           SAN LUIS| 305012736|         432310|      705.541708

Podemos observar que `TUCUMÁN` es la provincia que mas subsidia el boleto de `COLECTIVO` de cada habitante.

### Top 4 de Transacciones por Fecha por Provincia y Municipio en 2020

También podemos aplicar operaciónes de ventana(windowing) que son muy útiles para elaborar rankings entre otros usos muy útiles.
Por ejemplo, queremos conocer el top 4 de transacciones por `DIA`,`MES`,`AÑO`, `PROVINCIA`,`MUNICIPIO` y `TRANSACCIONES`.
Para ello, particionamos por las columnas `DIA`,`MES` y `AÑO` al dataframe con el total de transacciones en 2020 y las ordenamos de mayor a menor por `TRANSACCIONES`. Creamos una nueva columna llamada `ranking` con el numero de puesto en dicho ranking y luego filtramos las primeras 4 de cada ventana.

In [None]:
from pyspark.sql.window import Window
columns_2020 = ["DIA","MES","AÑO"]
w = Window.partitionBy(
columns_2020)\
    .orderBy(trans_provincia_2020_df["TRANSACCIONES"]\
             .desc())
df_ranked = trans_provincia_2020_df.select("PROVINCIA","MUNICIPIO","TRANSACCIONES",
                                           "DIA","MES","AÑO",
                                           F.rank().over(w).alias("ranking"))
df_top_4_2020 = df_ranked.filter(df_ranked["ranking"]<=4)
df_top_4_2020.show()

+------------+--------------------+-------------+---+---+----+-------+
|   PROVINCIA|           MUNICIPIO|TRANSACCIONES|DIA|MES| AÑO|ranking|
+------------+--------------------+-------------+---+---+----+-------+
|BUENOS AIRES|                  SN|       711254| 22| 04|2020|      1|
|BUENOS AIRES|          LA MATANZA|        68191| 22| 04|2020|      2|
|       JUJUY|SAN SALVADOR DE J...|        55959| 22| 04|2020|      3|
|    SAN JUAN|                  SN|        51998| 22| 04|2020|      4|
|BUENOS AIRES|                  SN|      3308267| 06| 03|2020|      1|
|BUENOS AIRES|  GENERAL PUEYRREDON|       343456| 06| 03|2020|      2|
|    SAN JUAN|                  SN|       323854| 06| 03|2020|      3|
|BUENOS AIRES|          LA MATANZA|       251202| 06| 03|2020|      4|
|BUENOS AIRES|                  SN|       626144| 01| 08|2020|      1|
|    SAN JUAN|                  SN|        83223| 01| 08|2020|      2|
|     MENDOZA|                  SN|        70786| 01| 08|2020|      3|
|BUENO

### Top 4 de Transacciones por Fecha por Provincia y Municipio en 2021

Procedemos de manera similar con el dataframe de las transacciones del 2021 `trans_provincia_2021` para elaborar el top 4.

In [None]:
columns_2021 = ["DIA","MES","AÑO"]
t = Window.partitionBy(
  columns_2021)\
    .orderBy(trans_provincia_2021_df["TRANSACCIONES"]\
             .desc())
df_ranked_2021 = trans_provincia_2021_df.select("PROVINCIA","MUNICIPIO","TRANSACCIONES",
                                           "DIA","MES","AÑO",
                                           F.rank().over(t).alias("ranking"))
df_top_4_2021 = df_ranked_2021.filter(df_ranked_2021["ranking"]<=4)
df_top_4_2021.show()

+------------+--------------------+-------------+---+---+----+-------+
|   PROVINCIA|           MUNICIPIO|TRANSACCIONES|DIA|MES| AÑO|ranking|
+------------+--------------------+-------------+---+---+----+-------+
|BUENOS AIRES|                  SN|      1538781| 19| 04|2021|      1|
|     MENDOZA|                  SN|       423011| 19| 04|2021|      2|
|    SAN JUAN|                  SN|       199325| 19| 04|2021|      3|
|BUENOS AIRES|  GENERAL PUEYRREDON|       160255| 19| 04|2021|      4|
|BUENOS AIRES|                  SN|      1419267| 06| 02|2021|      1|
|     MENDOZA|                  SN|       260625| 06| 02|2021|      2|
|BUENOS AIRES|  GENERAL PUEYRREDON|       116484| 06| 02|2021|      3|
|BUENOS AIRES|              MORENO|        98379| 06| 02|2021|      4|
|BUENOS AIRES|                  SN|       855926| 07| 03|2021|      1|
|     MENDOZA|                  SN|       150820| 07| 03|2021|      2|
|BUENOS AIRES|  GENERAL PUEYRREDON|        78539| 07| 03|2021|      3|
|BUENO

### Listar la tercera `LINEA` de `COLECTIVO` que mas transacciones tuvo por `PROVINCIA` y por `DIA`,`MES` y `AÑO` durante 2020 y 2021

Para resolver esto primero vamos a proceder a unir a los dos dataframes del 2020 y del 2021. Aplicamos el método .`union()`. Luego mostramos el dataframe resultante con `.show()`.

In [None]:
df_2020_2021 = final_df_2021.union(final_df_2020)
df_2020_2021.show()

+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|      NOMBRE_EMPRESA|           LINEA|AMBA|TIPO_TRANSPORTE|JURISDICCION|   PROVINCIA|         MUNICIPIO|CANTIDAD|DATO_PRELIMINAR|Dia|Mes| Año|
+--------------------+----------------+----+---------------+------------+------------+------------------+--------+---------------+---+---+----+
|  EMPRESA BATAN S.A.|BS_AS_LINEA 715M|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1466|             NO| 01| 01|2021|
|COMPAÑIA  DE TRAN...| BS_AS_LINEA_326|  SI|      COLECTIVO|  PROVINCIAL|BUENOS AIRES|                SN|     625|             NO| 01| 01|2021|
|EMPRESA DE TRANSP...| BS_AS_LINEA_512|  NO|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|GENERAL PUEYRREDON|    1119|             NO| 01| 01|2021|
|AUTOBUSES BUENOS ...| BS_AS_LINEA_514|  SI|      COLECTIVO|   MUNICIPAL|BUENOS AIRES|   ALMIRANTE BROWN|    2813|             NO| 01| 0

Después procedemos a agrupar el dataframe `df_2020_2021` por las columnas `DIA`,`MES`,`AÑO`,`LINEA` y `PROVINCIA`.
Realizamos la sumatoria de la columna `CANTIDAD` y la renombramos como `TRANSACCIONES`. Luego ordenamos de manera descendente por dicha columna. Luego vemos el resultado con `.show()`

In [None]:
df_2020_2021_agrupado =(df_2020_2021 \
  .groupBy('DIA','MES','AÑO','LINEA','PROVINCIA') \
  .sum("CANTIDAD") \
  .sort('sum(CANTIDAD)',ascending=False))

df_2020_2021_final = df_2020_2021_agrupado.withColumnRenamed("sum(CANTIDAD)","TRANSACCIONES")
df_2020_2021_final.show()

+---+---+----+---------------+------------+-------------+
|DIA|MES| AÑO|          LINEA|   PROVINCIA|TRANSACCIONES|
+---+---+----+---------------+------------+-------------+
| 13| 03|2020|BSAS_LINEA_501G|BUENOS AIRES|       194098|
| 10| 03|2020|BSAS_LINEA_501G|BUENOS AIRES|       189299|
| 12| 03|2020|BSAS_LINEA_501G|BUENOS AIRES|       187842|
| 06| 03|2020|BSAS_LINEA_501G|BUENOS AIRES|       187382|
| 05| 03|2020|BSAS_LINEA_501G|BUENOS AIRES|       180607|
| 05| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       178984|
| 12| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       178398|
| 19| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       178167|
| 10| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       177297|
| 07| 12|2021|BSAS_LINEA_501G|BUENOS AIRES|       176011|
| 17| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       174850|
| 18| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       174693|
| 09| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       174231|
| 26| 11|2021|BSAS_LINEA_501G|BUENOS AIRES|       174197|
| 17| 12|2021|

Finalmente, particionamos por las columnas `DIA`,`MES` y `AÑO` y ordenamos de manera descendente por `TRANSACCIONES`. Luego filtramos las líneas de colectivo que ocuparon el puesto 3 en `ranking`. Después vemos el resultado con `.show()`.

In [None]:
c = ["DIA","MES","AÑO"]
x = Window.partitionBy(
  c)\
    .orderBy(df_2020_2021_final["TRANSACCIONES"]\
             .desc())
ranking_3_linea = df_2020_2021_final.withColumn("ranking",F.rank().over(x).alias("ranking"))
ranking_3_linea = ranking_3_linea.filter(ranking_3_linea["ranking"]==3)
ranking_3_linea.show()

+---+---+----+--------------+------------+-------------+-------+
|DIA|MES| AÑO|         LINEA|   PROVINCIA|TRANSACCIONES|ranking|
+---+---+----+--------------+------------+-------------+-------+
| 19| 04|2021|BSAS_LINEA_365|BUENOS AIRES|        63542|      3|
| 22| 04|2020|BSAS_LINEA_365|BUENOS AIRES|        30347|      3|
| 06| 02|2021|BSAS_LINEA_440|BUENOS AIRES|        55572|      3|
| 06| 03|2020|BSAS_LINEA_440|BUENOS AIRES|       107817|      3|
| 07| 03|2021|BSAS_LINEA_365|BUENOS AIRES|        36027|      3|
| 10| 02|2021|BSAS_LINEA_365|BUENOS AIRES|        65058|      3|
| 29| 05|2021|BSAS_LINEA_365|BUENOS AIRES|        32149|      3|
| 01| 08|2020|BSAS_LINEA_365|BUENOS AIRES|        24519|      3|
| 05| 09|2021|BSAS_LINEA_365|BUENOS AIRES|        38621|      3|
| 08| 03|2021|BSAS_LINEA_365|BUENOS AIRES|        80905|      3|
| 08| 02|2021|BSAS_LINEA_365|BUENOS AIRES|        69398|      3|
| 12| 12|2021|BSAS_LINEA_365|BUENOS AIRES|        42534|      3|
| 14| 10|2020|BSAS_LINEA_

## **Guardar los datos para su análisis/procesamiento posterior**

El último paso de nuestra limpieza de datos es guardar el marco de datos limpiado en un tipo de archivo. Si planeas hacer cualquier análisis o procesamiento posterior usando Spark, es muy recomendable que uses Parquet. Hay otras opciones disponibles según tus necesidades, pero Spark está optimizado para aprovechar Parquet.

Hay dos opciones que utilizamos para el método `.write.parquet()`:

- La ruta de donde escribir el archivo
- Un parámetro opcional `mode`, que hemos establecido como `overwrite`. Esto permite a Spark escribir datos en una ubicación existente, resolviendo algunos problemas potenciales en un entorno de cuaderno.

In [None]:
# Save the data

df_clean_2020.write.parquet('/tmp/pyspark/df_clean_2020.parquet')
df_clean_2021.write.parquet('/tmp/pyspark/df_clean_2021.parquet')

df_final_2020.write.parquet('/tmp/pyspark/df_final_2020.parquet')
df_final_2021.write.parquet('/tmp/pyspark/df_final_2021.parquet')

df_2020.write.parquet('/tmp/pyspark/df_2020.parquet')
df_2021.write.parquet('/tmp/pyspark/df_2021.parquet')

final_df_2020.write.parquet('/tmp/pyspark/final_df_2020.parquet')

final_df_2021.write.parquet('/tmp/pyspark/final_df_2021.parquet')


final_df_sub_2021.write.parquet('/tmp/pyspark/final_df_sub_2021.parquet')


trans_provincia_2020_df.write.parquet('/tmp/pyspark/trans_provincia_2020_df.parquet')
trans_provincia_2021_df.write.parquet('/tmp/pyspark/trans_provincia_2021_df.parquet')

total_subsidios_df.write.parquet('/tmp/pyspark/total_subsidios_df.parquet')

df_sub_x_hab.write.parquet('/tmp/pyspark/df_sub_x_hab.parquet')

df_top_4_2020.write.parquet('/tmp/pyspark/df_top_4_2020.parquet')
df_top_4_2021.write.parquet('/tmp/pyspark/df_top_4_2021.parquet')

df_2020_2021.write.parquet('/tmp/pyspark/df_2020_2021.parquet')
df_2020_2021_final.write.parquet('/tmp/pyspark/df_2020_2021_final.parquet')

ranking_3_linea.write.parquet('/tmp/pyspark/ranking_3_linea.parquet')


Veamos ahora el contenido utilizando el comando de shell `ls`. Notarás que la ubicación `/tmp/pyspark/df_clean_2020.parquet` es en realidad un directorio, no sólo un archivo. Esto se debe a la forma en que Spark maneja su asignación de datos y su formato. 

In [None]:
# Is file in directory?
!ls /tmp/pyspark/df_clean_2020.parquet

part-00000-97a2c9cc-c7ff-4d75-a30f-ef4e8cca535d-c000.snappy.parquet  _SUCCESS
part-00001-97a2c9cc-c7ff-4d75-a30f-ef4e8cca535d-c000.snappy.parquet


Tenga en cuenta que, por lo general, al procesar los datos en Spark, querrá utilizar el formato Parquet, como se ha descrito anteriormente. Esto es genial para cualquier procesamiento o análisis posterior que planee hacer en Spark. Sin embargo, puede ser difícil leer los archivos Parquet fuera de Spark sin un trabajo extra. Por ello, vamos a crear una versión en formato CSV que podrás descargar si lo deseas.

Tendremos que realizar cuatro pasos para esta operación:

- Combinar los datos en un solo archivo utilizando la transformación `.coalesce(1)`. Spark normalmente mantiene los datos en archivos separados para mejorar el rendimiento y evitar problemas de RAM. Nuestro conjunto de datos es pequeño y podemos evitar estos problemas.
- Utilizaremos el método `.write.csv()` (en lugar de `.write.parquet()`). También tenemos que definir un componente `header=True` para que nuestras columnas se nombren correctamente.
- Por último, utilizaremos un comando especial específico para el entorno del cuaderno para descargar el archivo.

Como has visto en Spark, puedes encadenar comandos. Como tal, combinaremos los dos primeros componentes.

In [None]:
# Coalesce and save the data in CSV format

df_clean_2020.coalesce(1).write.csv('/tmp/pyspark/df_clean_2020.csv')
df_clean_2021.coalesce(1).write.csv('/tmp/pyspark/df_clean_2021.csv')

df_final_2020.coalesce(1).write.csv('/tmp/pyspark/df_final_2020.csv')
df_final_2021.coalesce(1).write.csv('/tmp/pyspark/df_final_2021.csv')

df_2020.coalesce(1).write.csv('/tmp/pyspark/df_2020.csv')
df_2021.coalesce(1).write.csv('/tmp/pyspark/df_2021.csv')

final_df_2020.coalesce(1).write.csv('/tmp/pyspark/final_df_2020.csv')

final_df_2021.coalesce(1).write.csv('/tmp/pyspark/final_df_2021.csv')


final_df_sub_2021.coalesce(1).write.csv('/tmp/pyspark/final_df_sub_2021.csv')


trans_provincia_2020_df.coalesce(1).write.csv('/tmp/pyspark/trans_provincia_2020_df.csv')
trans_provincia_2021_df.coalesce(1).write.csv('/tmp/pyspark/trans_provincia_2021_df.csv')

total_subsidios_df.coalesce(1).write.csv('/tmp/pyspark/total_subsidios_df.csv')

df_sub_x_hab.coalesce(1).write.csv('/tmp/pyspark/df_sub_x_hab.csv')

df_top_4_2020.coalesce(1).write.csv('/tmp/pyspark/df_top_4_2020.csv')
df_top_4_2021.coalesce(1).write.csv('/tmp/pyspark/df_top_4_2021.csv')

df_2020_2021.coalesce(1).write.csv('/tmp/pyspark/df_2020_2021.csv')
df_2020_2021_final.coalesce(1).write.csv('/tmp/pyspark/df_2020_2021_final.csv')

ranking_3_linea.coalesce(1).write.csv('/tmp/pyspark/ranking_3_linea.csv')

In [None]:
# Look at the output of the command using the shell command `ls`
!ls /tmp/pyspark/

df_2020_2021.csv	    df_top_4_2020.csv
df_2020_2021_final.csv	    df_top_4_2020.parquet
df_2020_2021_final.parquet  df_top_4_2021.csv
df_2020_2021.parquet	    df_top_4_2021.parquet
df_2020.csv		    final_df_2020.csv
df_2020.parquet		    final_df_2020.parquet
df_2021.csv		    final_df_2021.csv
df_2021.parquet		    final_df_2021.parquet
df_clean_2020.csv	    final_df_sub_2021.csv
df_clean_2020.parquet	    final_df_sub_2021.parquet
df_clean_2021.csv	    ranking_3_linea.csv
df_clean_2021.parquet	    ranking_3_linea.parquet
df_final_2020.csv	    total_subsidios_df.csv
df_final_2020.parquet	    total_subsidios_df.parquet
df_final_2021.csv	    trans_provincia_2020_df.csv
df_final_2021.parquet	    trans_provincia_2020_df.parquet
df_sub_x_hab.csv	    trans_provincia_2021_df.csv
df_sub_x_hab.parquet	    trans_provincia_2021_df.parquet


In [None]:
# Rename the data file

df_clean_2020.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_clean_2020.csv',mode='overwrite')
df_clean_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_clean_2021.csv',mode='overwrite')

df_final_2020.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_final_2020.csv',mode='overwrite')
df_final_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_final_2021.csv',mode='overwrite')

df_2020.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_2020.csv',mode='overwrite')
df_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_2021.csv',mode='overwrite')

In [None]:
final_df_2020.coalesce(1).write.option('header','true').csv('/tmp/pyspark/final_df_2020.csv',mode='overwrite')

final_df_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/final_df_2021.csv',mode='overwrite')


final_df_sub_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/final_df_sub_2021.csv',mode='overwrite')


trans_provincia_2020_df.coalesce(1).write.option('header','true').csv('/tmp/pyspark/trans_provincia_2020_df.csv',mode='overwrite')
trans_provincia_2021_df.coalesce(1).write.option('header','true').csv('/tmp/pyspark/trans_provincia_2021_df.csv',mode='overwrite')

total_subsidios_df.coalesce(1).write.option('header','true').csv('/tmp/pyspark/total_subsidios_df.csv',mode='overwrite')

df_sub_x_hab.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_sub_x_hab.csv',mode='overwrite')

df_top_4_2020.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_top_4_2020.csv',mode='overwrite')
df_top_4_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_top_4_2021.csv',mode='overwrite')

df_2020_2021.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_2020_2021.csv',mode='overwrite')
df_2020_2021_final.coalesce(1).write.option('header','true').csv('/tmp/pyspark/df_2020_2021_final.csv',mode='overwrite')

ranking_3_linea.coalesce(1).write.option('header','true').csv('/tmp/pyspark/ranking_3_linea.csv',mode='overwrite')

## **Finalizar la sesión de Spark**

Luego de haber trabajado con los datos procedemos a finalizar la sesión.

In [None]:
spark.stop()