<a href="https://colab.research.google.com/github/JhonFiUNFV/python_prep/blob/master/SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1.- Configuracion Entorno Spark

In [None]:
# Descargamos spark con hadoop y Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Seteamos las variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
# Creamos la conexion a Spark 
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
spark

# 2.- Importacion de Librerias

In [None]:
# Librerias Spark
from pyspark.sql import functions as F

#Librerias Python
import pandas as pd

# 3.- Importacion de Datos

In [None]:
# Usamos la librería files de colab
from google.colab import files
uploaded = files.upload()

# También se puede importar desde el mismo navegador

## 3.1. Importacion desde un csv

Para importar datos desde un csv utilizamos el comando **spark.read.csv**

Sintaxis más utilizada:
**dfsSpark = spark.read.csv(DATA_PATH + file_name, sep = ',', header=True, inferSchema=True)**

In [None]:
help(spark.read.csv)

Ejercicio 1:
Importar el fichero **salario.csv** y ponerle de nombre dfsSalario

In [None]:
# Insertar codigo aqui
dfsCSV = spark.read.csv('2015-summary.csv', sep = ',', header=True, inferSchema=True)
dfsCSV.show(5)

## 3.2. Importacion desde un json

Para importar datos desde un csv utilizamos el comando **spark.read.json**

Sintaxis más utilizada:
**dfsSpark = spark.read.json(DATA_PATH + file_name)**

In [None]:
help(spark.read.json)

In [None]:
# Importando desde json
dfsJson = spark.read.json('2015-summary.json')
dfsJson.show(5)

## 3.3. Importacion desde un txt

Para importar datos desde un csv utilizamos el comando **spark.read.text**

Sintaxis más utilizada:
**dfsSpark = spark.read.text(DATA_PATH + file_name)**

In [None]:
help(spark.read.text)

In [None]:
# Importando desde txt
dfsTxt = spark.read.text('salario.txt')
dfsTxt.show(5)

## 3.4. Importacion desde un parquet

Para importar datos desde un csv utilizamos el comando **spark.read.parquet**

Sintaxis más utilizada:
**dfsSpark = spark.read.parquet(DATA_PATH + file_name)**

In [None]:
help(spark.read.parquet)

In [None]:
# Importando desde parquet
dfsParquet = spark.read.parquet('part-r-00000-1a9822ba-b8fb-4d8e-844a-ea30d0801b9e.gz.parquet')
dfsParquet.show(5)

## 3.5. Importacion desde un pandas Dataframe

Para importar datos desde un csv utilizamos el comando **spark.createDataFrame()**

Sintaxis más utilizada:
**dfsSpark = spark.createDataFrame(pandasDataframe)**

In [None]:
help(spark.createDataFrame)

In [None]:
# Importando desde csv
dfpMorosidad = pd.read_csv('morosidad.csv')
dfpMorosidad.head(5)

In [None]:
# Verificando tipo de dato
type(dfpMorosidad)

In [None]:
# Convirtiendo a Spark Dataframe
dfsMorosidad = spark.createDataFrame(dfpMorosidad)
dfsMorosidad.show(5)

In [None]:
# Verificando tipo de dato
type(dfsMorosidad)

# 4.- Información básica de DataFrames



## 4.1. Previsualización

**show** es un método que muestra por pantalla _n_ filas del DataFrame.

In [None]:
dfsCSV.show(10)



## 4.2. Dimensiones

En Spark, no existe un método *shape*, por lo que hay que contar por separados las filas y las columnas.

In [None]:
# Contando el numero de filas
dfsCSV.count()



`columns` es un atributo que contiene los nombres de las columnas del DataFrame.

In [None]:
dfsMorosidad.columns

In [None]:
len(dfsMorosidad.columns)



## 4.3. Schema

El schema de un dataframe nos muestra como se interpretaran los datos. Esto no significa que los datos estén así. _schema_ es un atributo del objeto, no un método. _printSchema()_ es un método que muestra una versión más entendidible del _schema_.

In [None]:
dfsCSV.schema

In [None]:
dfsCSV.printSchema()



## 4.3. dtypes

El atributo `dtypes` contiene los nombres de las columnas del dataframe junto con su tipo. Esto permite seleccionar nombres de columnas basados en el tipo, normalmente las variables categóricas (string y boolean) tienen tratamientos distintos a las numéricas (enteras y decimales).

In [None]:
dfsCSV.dtypes

# 5.- Operaciones con Dataframes de Spark

In [None]:
dfsMorosidad.show(5)

## 5.1. Select

Filtra las columnas que deseamos mostrar

In [None]:
# Forma 1
dfsMorosidad.select('meses', 'score', 'zona').show(5)

In [None]:
# Forma 2
dfsMorosidad.select(F.col('meses'), F.col('meses') * 2, F.col('score'), F.col('zona')).show(5)

In [None]:
# Forma 3
columnas = ['meses','score','zona']
dfsMorosidad.select(columnas).show(5)

In [None]:
# Combinando las tres formas
columnas = ['meses','score','zona']
dfsMorosidad.select('ID', F.col('nivel'), *columnas).show(5)

## 5.2. Filter - Where

filtra resgistros segun cumplan la condicion.

In [None]:
dfsParquet.show(5)

In [None]:
# Forma 1
dfsParquet.where("DEST_COUNTRY_NAME = 'United States'").show(5)

In [None]:
# Forma 2
dfsParquet.filter(F.col('ORIGIN_COUNTRY_NAME') == 'India').select('ORIGIN_COUNTRY_NAME','count').show(5)

**Ejercicio 1:
Genere una consulta que liste los paises cuyo 'count' sea mayor a 25**

In [None]:
# Resolver aqui


In [None]:
# Varios valores - Forma 1
paises = ['United States', 'Egypt', 'Equatorial Guinea']
dfsParquet.where(F.col('DEST_COUNTRY_NAME').isin(paises)).show(5)

In [None]:
# Varios valores - Forma 2
dfsParquet.where("DEST_COUNTRY_NAME in ('United States', 'Egypt', 'Equatorial Guinea')").show(5)

In [None]:
# Varios valores - Forma 3
dfsParquet.where(F.col('DEST_COUNTRY_NAME').isin('United States', 'Egypt', 'Equatorial Guinea')).show(5)

**OJO**: Para negar podemos usar el operador **~**

In [None]:
# Negamos la anterior sentencia
dfsParquet.where(~F.col('DEST_COUNTRY_NAME').isin('United States', 'Egypt', 'Equatorial Guinea')).show(5)

**Ejercicio 2:
Genere una consulta que liste los paises cuyo 'ORIGIN_COUNTRY_NAME' no sea Romania, Ireland ni United States**

In [None]:
# Resolver aqui




__Combinación de filtros (AND / OR)__

__AND__

In [None]:
# Forma 1
dfsParquet.where( ( F.col('DEST_COUNTRY_NAME') == 'Malta' ) & 
                 ( F.col('ORIGIN_COUNTRY_NAME') == 'United States' ) ).show(5)

In [None]:
# Forma 2
dfsParquet.where("DEST_COUNTRY_NAME = 'Malta' and ORIGIN_COUNTRY_NAME = 'United States'").show(5)

__OR__

In [None]:
# Forma 1
dfsParquet.where((F.col('DEST_COUNTRY_NAME') == 'Malta') | (F.col('ORIGIN_COUNTRY_NAME') == 'United States')).show(5)

In [None]:
# Forma 2
dfsParquet.where("DEST_COUNTRY_NAME = 'Malta' or ORIGIN_COUNTRY_NAME = 'United States'").show(5)

**Ejercicio 3:
Genere una consulta que liste los paises cuyo 'DEST_COUNTRY_NAME' sea United States y su 'ORIGIN_COUNTRY_NAME' no sea India**

In [None]:
# Resolver aqui


## 5.3. Group By

Resume los registros en gru

In [None]:
dfsCSV.show(5)

In [None]:
# Count
dfsCSV.where('DEST_COUNTRY_NAME = "United States"').groupBy('DEST_COUNTRY_NAME').count().show(5)

In [None]:
# SUM
dfsCSV.groupBy('DEST_COUNTRY_NAME').sum('count').show(5)

In [None]:
# Combinando las dos formas
# SELECT count(*), SUM(variable)
# FROM dfsCSV
# GROUP BY DEST_COUNTRY_NAME

dfsCSV.groupBy('DEST_COUNTRY_NAME').agg(F.count('*'), F.max('count')).show(5)

**Ejercicio 4:
Genere una consulta que agrupe por 'DEST_COUNTRY_NAME' y saque el minimo, maximo y el prmedio de 'count'**

In [None]:
# Resuelva aqui


## 5.4. Sort - OrderBy

Ordena los registros segun columna.

In [None]:
dfsMorosidad.show(5)

In [None]:
dfsMorosidad.orderBy(F.col('edad').desc()).show(5)

In [None]:
dfsMorosidad.sort(F.col('ingreso').asc()).show(5)

In [None]:
dfsMorosidad.sort(F.col('tipo_vivienda').asc(), F.col('ingreso').desc()).show(5)

## Funciones Extras

### Clausula When

In [None]:
dfsCSV.select('count', F.when(F.col('count') < 25, "Menores a 25").otherwise("Mayores a 25")).show()

In [None]:
dfsCSV.select('DEST_COUNTRY_NAME', F.when(F.col('count') < 25, "Menores a 25").when(F.col('count') < 50, "Menores a 50").otherwise("Mayores a 100")).show()

### Clausula Like

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", F.col("DEST_COUNTRY_NAME").like("%Egy%")).show()

### Clausula Startswith - Endswith

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", F.col("DEST_COUNTRY_NAME").startswith("Uni")).show()

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", F.col("DEST_COUNTRY_NAME").endswith("a")).show()

### Clausula Substring

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", (F.col("DEST_COUNTRY_NAME").substr(2,4)).alias("Nombre_Corto")).show()

### Clausula Between

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", "count", F.col("count").between(25,75)).show()

In [None]:
dfsCSV.select("DEST_COUNTRY_NAME", "count").where(F.col("count").between(25,75)).show()

### Distinct

Una llamada al método `distinct` es lo mismo que al método `dropDuplicates` sin parámetro. Es decir, tiene en cuenta todas las columnas. También se utiliza normalmente para contar los valores únicos de una columna.

In [None]:
# Usamos la funcion distinct
dfsCSV.select('DEST_COUNTRY_NAME').distinct().show(5)

### Agregando Columnas

Usamos la sentencia **withColumn()**

In [None]:
dfsCSV = dfsCSV.withColumn('Conteo', F.col('count') + 5 )
dfsCSV.show(5)

### Modificando nombres de columnas

In [None]:
dfsCSV = dfsCSV.withColumnRenamed('count', 'cuenta')
dfsCSV.show()

**Ejercicio 5:
Genere una nueva columna llamada 'INICIALES' que sean las tres primeras letras del campo 'DEST_COUNTRY_NAME'**

In [None]:
# Resuelva aqui


**Ejercicio 6:
Genere una nueva columna llamada 'FLAG_CONTEO' que si la variable cuenta es mayor a 25 entonces tome el valor de 1 caso contrario el valor 0**

In [None]:
# Resuelva aqui




### Drop

El método `drop` tiene la función contraria al `select`, elimina un subconjunto de columnas. En este caso no se puede pasar una lista de columnas, es necesario utlizar el operador `*` para convertirlo a parámetros indivuales.

**OJO:** Si se intenta eliminar una columna que no existe no devuelve error.

In [None]:
# Forma 1
dfsCSV = dfsCSV.drop('count','conteo')
dfsCSV.show(5)

In [None]:
# Forma 2
columnas = ['DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME']
dfsCSV = dfsCSV.drop(*columnas)
dfsCSV.show(5)