In [None]:
!wget --no-cache -O init.py -q https://raw.githubusercontent.com/UDEA-Esp-Analitica-y-Ciencia-de-Datos/EACD-03-BIGDATA/master/init.py
import init; init.init(force_download=False); 
from IPython.display import Image

In [None]:
Image("local/imgs/udea-datascience.png")

In [None]:
#Instalación
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz 
!pip install -q findspark

#Variables de Entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

#SparkContext
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

#**DATA FRAME**

Se puede definir como una colección de datos distribuidos organizada por columnas y optimizada para para operar en clústers de Spark

Es un concepto similar a las tablas en las bases de datos relacionales solo que presentan características similares a los RDDs

En los Data Frames se puede acceder a los datos utilizando la librería SQL de Spark a través del SQLContext que se obtiene desde el SparkContext


In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

**Crear un Data Frame**

Existen dos formas de crear un Data Frame

1. Desde un RDD: Crear un RDD y usar el método createDataFrame para obtener un Data Frame a partir del RDD

In [None]:
datos=[(1,"Juan",33,"Masculino","Ingeniero",4500000),\
       (2,"Ana",38,"Femenino","Arquitecta",6200000),\
       (3,"Carmen",52,"Femenino","Abogada",7500000)]
empleadosRDD=sc.parallelize(datos)
empleadosRDD.collect()

In [None]:
type(empleadosRDD)


In [None]:
empleadosDF=sqlCtx.createDataFrame(empleadosRDD)
empleadosDF.collect()

In [None]:
type(empleadosDF)

Los Data Frame tiene algunos métodos que facilitan la operación y visualización de los datos

In [None]:
empleadosDF.show()

Para comprender mejor los datos podemos indicar el nombre de las columnas

In [None]:
empleadosDF=sqlCtx.createDataFrame(empleadosRDD,["Id","Nombre","Edad","Sexo","Profesión","Salario"])
empleadosDF.collect()

In [None]:
empleadosDF.show()

Veamos qué tipo de dato se adoptó para cada variable

In [None]:
empleadosDF.printSchema()

En el siguiente link puede consultar los diferentes tipos de datos disponibles


http://spark.apache.org/docs/3.0.0-preview2/sql-ref-datatypes.html


La Edad y el salario los estamos usando con el mismo tipo de dato, pero en realidad podríamos optimizar si modificamos el tipo de dato de la edad

In [None]:
from pyspark.sql.types import *
empleadosDF=sqlCtx.createDataFrame(empleadosRDD, \
          StructType([ \
                      StructField("Id", ByteType(),False), \
                      StructField("Nombre", StringType(),False), \
                      StructField("Edad", ByteType(),False), \
                      StructField("Sexo", StringType(),False), \
                      StructField("Profesión", StringType(),False), \
                      StructField("Salario", IntegerType(),False)]))

In [None]:
empleadosDF.printSchema()
empleadosDF.show()

2. Crear Data Frame desde un archivo externo

**Cargar archivo csv**

In [None]:
empleadosDF = sqlCtx.read.csv("local/data/empleados.csv")
type(empleadosDF)

In [None]:
empleadosDF.show(5)

Creo el Data Frame correctamente pero no le hemos indicado que la primera línea es el header

In [None]:
empleadosDF = sqlCtx.read.option("header",True) \
     .csv("local/data/empleados.csv")
empleadosDF.show(5)

Veamos el formato asumido para cada variable

In [None]:
empleadosDF.printSchema()

Qué tal si asignamos el tipo de datos de cada variable

In [None]:
schema = StructType() \
      .add("Id",ByteType(),True) \
      .add("Nombre",StringType(),True) \
      .add("Edad",ByteType(),True) \
      .add("Sexo",StringType(),True) \
      .add("Profesión",StringType(),True) \
      .add("Salario",IntegerType(),True)


In [None]:
empleadosDF = sqlCtx.read.format("csv") \
      .option("header", True) \
      .option("delimiter", ",") \
      .schema(schema) \
      .load("local/data/empleados.csv")

In [None]:
empleadosDF.printSchema()
empleadosDF.show()

**Cargar archivo json**

In [None]:
empleadosDF = sqlCtx.read.json("local/data/empleados.json")
type(empleadosDF)

In [None]:
empleadosDF.printSchema()
empleadosDF.show()

In [None]:
empleadosDF = sqlCtx.read.json("local/data/empleados.json", \
            StructType([ \
                      StructField("Id", IntegerType(),False), \
                      StructField("Nombre", StringType(),False), \
                      StructField("Edad", ByteType(),False), \
                      StructField("Sexo", StringType(),False), \
                      StructField("Profesión", StringType(),False), \
                      StructField("Salario", IntegerType(),False)]))
type(empleadosDF)

In [None]:
empleadosDF.printSchema()
empleadosDF.show()

**Acceso a los datos**

Una gran ventaja que ofrecen los Data Frames es que podemos acceder a cada variable a través del nombre del data frame, el caracter punto(.) y el nombre de la variable

In [None]:
empleadosDF.Salario

##**Operaciones**

Sobre los Data Frames podemos realizar varios tipos de operaciones.

Vamos dos de ellas, a saber, propiedades y métodos

###**Propiedades**

Las propiedades son atributos que podemos consultar sobre nuestros Data Frames.

Veamos algunas de ellas

**columns**

Es una propiedad que se utiliza para conocer el nombre de las variables (columnas) del Data Frame

In [None]:
empleadosDF.columns

**dtypes**

Atributo que indica el tipo de dato de cada variable (columna) del Data Frame

In [None]:
empleadosDF.dtypes

**rdd**

Convierte un Data Frame a un RDD

In [None]:
empRDD=empleadosDF.rdd
type(empRDD)


###**Métodos**

Data Frame dispone de una serie de métodos que podemos aplicar sobre nuestros datos. Algunos de estos son similares a los métodos revisados para RDD.

Veamos algunos de los principales métodos disponibles en Data Frames

**agg(funciones_agregacion)**

Este método realiza una agregación sobre el data frame, es decir que a partir del data frame original, devuelve un nuevo data frame que contiene el resultado de las funciones de agregación realizadas sobre el conjunto de datos.

Estas funciones de agregación son especificadas por el usuario y hacen referencia a operaciones básicas sobre los datos. Estas son algunas de las funciones de agregación disponibles:

* first: entrega el primer elemento
* last: entrega el último elemento
* min: entrega el valor mínimo de una variable específica
* max: entrega el valor máximo de una variable específica
* sum: entrega la suma de los valores de una variable específica
* avg: entrega el promedio de los valores de una variable específica

Usemos el método agg para conocer el promedio de los salarios de los empleados

---



In [None]:
empleadosDF.show()

In [None]:
salario=empleadosDF.agg({"Salario ": "avg"}).take(1)
type(salario)

In [None]:
salario=empleadosDF.agg({"Salario": "avg"})
salario.show()

In [None]:
type(salario)

También podemos utilizar las funciones disponibles en la clase functions de la librería pyspark.sql 

In [None]:
from pyspark.sql import functions
edad=empleadosDF.agg( \
    functions.min(empleadosDF.Edad), \
    functions.max(empleadosDF.Edad), \
    functions.avg(empleadosDF.Edad))
edad.show()

In [None]:
type(edad)

En este caso utilizamos tres funciones de agregación, el resultado es un nuevo Data Frame de una sola fila y tres columnas (una por cada función de agregación)

**corr(var1,var2)**

Este método se utiliza para calcular la correlación entre un par de variables del Data Frame

In [None]:
empleadosDF.corr("Edad", "Salario")

**count()**

Se utiliza para conocer el número de registros (filas) en el Data Frame

In [None]:
empleadosDF.count()

**distinct()**

Crea un nuevo Data Frame a partir de los registros (filas) que sean diferentes en el Data Frame inicial

In [None]:
emp=empleadosDF.distinct()
emp.show()

**drop(var)**

Toma el Data Frame inicial, elimina la variable (columna) especificada y entrega un nuevo Data Frame con el resultado

In [None]:
emp=empleadosDF.drop("Id")
emp.show()

**union(other)**

Este método crea un nuevo Data Frame a partir de la unión de dos Data Frames

In [None]:
nuevos=sqlCtx.createDataFrame([ \
      (4,"Ana",40,"Femenino","Docente",7600000), \
      (5,"Luis",62,"Masculino","Panadero",None)])
nuevos.show()

In [None]:
emp=empleadosDF.union(nuevos)
emp.show()

**dropDuplicates(var)**

Este método devuelve un nuevo Data Frame en el que ha eliminado los registros (filas) duplicadas. Esto ya los hacía el método distinct(), la diferencia es que en dropDuplicates() se puede especificar las columnas a evaluar para considerar que dos registros son duplicados

In [None]:
empd=emp.dropDuplicates()
empd.show()

**fillna(valor,col[ ])**

Este método devuelve un nuevo Data Frame luego de sustituir los valores nulos detectados por el valor especificado.

Adicionalmente, es posible indicarlo cuales son las columnas sobre las que va a operar

In [None]:
empfna=empd.fillna(1000000)
empfna.show()

**dropna(_cuantas, _umbral, _columnas)**

Este método devuelve un nuevo Dat Frame resultado de eliminar los registros (filas) que contienen nulos en el Data Frame inicial.



In [None]:
empd.show()

In [None]:
empdna=empd.dropna("any")
empdna.show()

**filter(condicion)**

Funciona similar al filter de RDD. Devuelve un nuevo Data Frame que contiene los registros para los cuales la condición arroja como resultado True

In [None]:
empf=empleadosDF.filter(empleadosDF.Salario>5000000)
empleadosDF.show()
empf.show()

**GroupBy(var)**

Genera un nuevo Data Frame donde agrupa los registros que coincidan en la variable especificada. 

Luego de agrupar es posible realizar agregaciones para indicar que se desea realizar con los valores de los registros agrupados


In [None]:
empavg=emp.groupBy("Nombre").avg()
empsal=emp.groupBy("Nombre").agg({"Salario":"avg"})
print("Data Frame Inicial")
emp.show()
print("Data Frame promediando todas las variables")
empavg.show()
print("Data Frame promediando una variable específica")
empsal.show()

**orderBy(var)**

Entrega un Data Frame ordenado de acuerdo a la variable indicada

In [None]:
empo=emp.orderBy(emp.Salario.desc())
print("Data Frame Inicial")
emp.show()
print("Data Frame ordebado por salario")
empo.show()

**select(lista_var)**

Genera un nuevo Data Frame que incluye únicamente las variables indicadas del Data Frame original

In [None]:
empleadosDF=empleadosDF.select("Nombre","Sexo","Salario")
print("Data Frame Inicial")
empleadosDF.show()
print("Data Frame nuevo")
emps.show()
empleadosDF.

**selectExpr(expresiones)**

Funciona similar al método select(), pero permite crear nuevas variables (columnas) a partir de la evaluación de las expresiones indicadas

In [None]:
emps=empleadosDF.selectExpr("Nombre","Sexo","Salario", "Edad > 50")
print("Data Frame Inicial")
empleadosDF.show()
print("Data Frame nuevo")
emps.show()

**withcolumn(nombre,expresion)**

Al igual que selectExpr() permite adicionar columnas con el resultado de la evaluación de una expresión. La diferencia consiste en poder asignar el nombre de las columnas adicionadas

In [None]:
emps=empleadosDF.withColumn("Mayor de 50", empleadosDF.Edad > 50)
print("Data Frame Inicial")
empleadosDF.show()
print("Data Frame nuevo")
emps.show()

##**Consultas SQL**

Los métodos descritos anteriormente permiten operar los datos para crear registros, leerlos, actualizarlos e incluso eliminarlos, las operaciones básicas de un CRUD.

Seguramente usted está acostumbrado a realizar estas operaciones utilizando lenguaje SQL, pues bien, la clase SqlContext de Spark cuenta con un método llamado sql que permite utilizar el lenguaje SQL para operar un Data Frame

Para poder hacer esto, debe registrar en el motor de SQL una tabla que servirá para referenciar el Data Frame y realizar las operaciones sobre dicha tabla

In [None]:
empleadosDF.registerTempTable("Empleados")

De esta manera cada que requiere utilizar una consulta SQL lo realizará sobre la tabla Empleados

**Lectura de datos**

In [None]:
q1 = sqlCtx.sql("SELECT * FROM Empleados")
q1.show()

**Filtremos los registros que la edad sea mayor que 50**

In [None]:
q2 = sqlCtx.sql('SELECT * FROM Empleados WHERE Sexo >Masculino')
q2.show()

**Leamos solo las variables de interés**

In [None]:
q3 = sqlCtx.sql("""
		SELECT Nombre, Edad, Sexo 
		FROM Empleados
	""")
q3.show()

**Funciones definidas por el usuario**

A demás de las consultas básicas que proporciona SQL, el usuario puede definir sus propias funciones e incorporarlas dentro de una consulta SQL. Esto es de gran ayuda para personalizar las acciones que deseamos realizar sobre los datos

Primero debemos definir la función a utilizar

In [None]:
def funcionEdad(edad):
	if edad < 50:
		return "Menor de 50"
	else:
		return "Mayor de 50"


Ahora debemos registrar la función con la ayuda del SqlContext

In [None]:
sqlCtx.registerFunction("funcionEdad",funcionEdad)


Ahora si podemos empezar a usar nuestra función

In [None]:
q4 = sqlCtx.sql("""
		SELECT Nombre, Sexo, Salario, funcionEdad(Edad) AS Edad_Mayor_50
		FROM Empleados
		""")
q4.show()

#**Ejemplos**

##**Titanic**

El archivo titanic.csv recoge la información de los 2201 pasajeros del titanic. Para cada pasajero se dispone de la siguiente información.

* Clase: Tripulación, Primera, Segunda, Tercera
* Edad: Adulto, Niño
* Sexo: Hombre, Mujer
* Sobrevivió: Si, No

Creemos un Data Frame a partir de este archivo y realicemos algunas operaciones

In [None]:
titanic = sqlCtx.read.option("header",True) \
      .option("delimiter",";") \
     .csv("local/data/titanic.csv")
titanic.show(5)

**Cuántos pasajeros sobrevivieron por clase?**

In [None]:
sob=titanic.filter(titanic.Sobrevivio=="Si")
sob.distinct().show()

In [None]:
sob1=sob.filter(sob.Clase=="Primera")
sob1.count()

In [None]:
cs1=titanic.filter(titanic.Sobrevivio=="Si").filter(titanic.Clase=="Primera").count()
cs2=titanic.filter(titanic.Sobrevivio=="Si").filter(titanic.Clase=="Segunda").count()
cs3=titanic.filter(titanic.Sobrevivio=="Si").filter(titanic.Clase=="Tercera").count()
cst=titanic.filter(titanic.Sobrevivio=="Si").filter(titanic.Clase=="Tripulacion").count()

print("Los sobrevivientes de primera clase son : ",cs1)
print("Los sobrevivientes de segunda clase son : ",cs2)
print("Los sobrevivientes de tercera clase son : ",cs3)
print("Los sobrevivientes de la tripulación son : ",cst)



Y cómo contamos los que no sobrevivieron?

In [None]:
ns1=titanic.filter(titanic.Sobrevivio=="No").filter(titanic.Clase=="Primera").count()
ns2=titanic.filter(titanic.Sobrevivio=="No").filter(titanic.Clase=="Segunda").count()
ns3=titanic.filter(titanic.Sobrevivio=="No").filter(titanic.Clase=="Tercera").count()
nst=titanic.filter(titanic.Sobrevivio=="No").filter(titanic.Clase=="Tripulacion").count()

print("Los que NO sobrevivieron de primera clase son : ",ns1)
print("Los que NO sobrevivieron de segunda clase son : ",ns2)
print("Los que NO sobrevivieron de tercera clase son : ",ns3)
print("Los que NO sobrevivieron de la tripulación son : ",nst)

Calculemos el promedio

In [None]:
print("Promedio de sobrevivientes por clase")
print("Primera clase : ",round((cs1*100)/(cs1+ns1),1))
print("Segunda clase : ",round((cs2*100)/(cs2+ns2),1))
print("Tercera clase : ",round((cs3*100)/(cs3+ns3),1))
print("Tripulación :   ",round((cst*100)/(cst+nst),1))

Veamos otra forma de resolverlo

In [None]:
titanic.registerTempTable("Titanic")
q = sqlCtx.sql("SELECT * FROM Titanic")
q.show(5)

In [None]:
titanic.registerTempTable("Titanic")
q = sqlCtx.sql("SELECT Clase, Sobrevivio FROM Titanic")
q.distinct().show()

In [None]:
def SobrevivioInt(sobrevivio):
		return int (sobrevivio=="Si")

In [None]:
sqlCtx.registerFunction("SobrevivioInt",SobrevivioInt)

In [None]:
sob = sqlCtx.sql("""
		SELECT Clase, SobrevivioInt(Sobrevivio) AS Sobrevivio
		FROM Titanic
		""")
sob.distinct().show()

In [None]:
sob.printSchema()

In [None]:
from pyspark.sql import functions
from pyspark.sql.types import *
sobInt=sob.withColumn("Sobrevivio",sob.Sobrevivio.cast(ByteType()))
sobInt.printSchema()


In [None]:
sobClase=sobInt.groupBy('Clase').sum().show()

In [None]:
totalClase=sobInt.groupBy('Clase').count().show()

In [None]:
promSobClase=sobInt.groupBy('Clase').avg().show()

**Qué pasó con los adultos de tercera clase?**

In [None]:
at=titanic.filter((titanic.Edad=="Adulto") & (titanic.Clase=="Tercera")).select("Sexo","Sobrevivio")
at.distinct().show()

In [None]:
hsi=at.filter((titanic.Sexo=="Hombre") & (titanic.Sobrevivio=="Si")).count()
msi=at.filter((titanic.Sexo=="Mujer") & (titanic.Sobrevivio=="Si")).count()
th=at.filter(titanic.Sexo=="Hombre").count()
tm=at.filter(titanic.Sexo=="Mujer").count()

print("Los Adultos Hombres de terera clase eran : %d y sobrevivieron: %d" %(th,hsi))
print("Los Adultos Mujeres de terera clase eran : %d y sobrevivieron: %d" %(tm,msi))

Veamos otra forma de resolverlo

In [None]:
sob = sqlCtx.sql("""
		SELECT Sexo, SobrevivioInt(Sobrevivio) AS Sobrevivio
		FROM Titanic
    WHERE Clase = 'Tercera' AND Edad = 'Adulto'
		""")
sob.distinct().show()

In [None]:
sob.printSchema()

In [None]:
from pyspark.sql import functions
sobInt=sob.withColumn("Sobrevivio",sob.Sobrevivio.cast(ByteType()))
sobInt.printSchema()
sobInt.distinct().show()

In [None]:
ats=sobInt.groupBy("Sexo").sum().show()

In [None]:
at=sobInt.groupBy("Sexo").count().show()

In [None]:
pats=sobInt.groupBy("Sexo").avg().show()

**Al momento del rescate se cumplió con la premisa "Mujeres y niños primero"**

Revisemos que pasó con las mujeres

In [None]:
m=titanic.filter(titanic.Sexo=="Mujer").select("Edad","Sobrevivio")
m.distinct().show()
tms=m.filter(titanic.Sobrevivio=="Si").count()
tm=m.count()
pms=tms*100/tm
print("El total de mujeres que en el Titanic es: ", tm)
print("El total de mujeres que sobrevivieron es: ", tms)
print("El porcentaje de mujeres que sobrevivieron es: ", round(pms,1))

Ahora veamos que pasó con los niños

In [None]:
n=titanic.filter(titanic.Edad=="Ni�o").select("Sexo","Sobrevivio")
n.distinct().show()
tns=n.filter(titanic.Sobrevivio=="Si").count()
tn=n.count()
pns=tns*100/tn
print("El total de niños que en el Titanic es: ", tn)
print("El total de niños que sobrevivieron es: ", tns)
print("El porcentaje de niños que sobrevivieron es: ", round(pns,1))

Veamos que pasó con mujeres y niños

In [None]:
mn=titanic.filter(((titanic.Edad=="Ni�o") | (titanic.Sexo=="Mujer"))).select("Edad","Sexo","Sobrevivio")
mn.distinct().show()
tmns=mn.filter(titanic.Sobrevivio=="Si").count()
tmn=mn.count()
pmns=tns*100/tn
print("El total de mujeres y niños en el Titanic es: ", tmn)
print("El total de mujeres y niños que sobrevivieron es: ", tmns)
print("El porcentaje de mujeres y niños que sobrevivieron es: ", round(pmns,1))

Veamos otra forma de resolver el ejercicio

In [None]:
titanic.distinct().show()

In [None]:
titanicS = sqlCtx.sql("""
		SELECT Sexo, Edad, SobrevivioInt(Sobrevivio) AS Sobrevivio
		FROM Titanic
		""")
titanicS.distinct().show()

In [None]:
from pyspark.sql import functions
from pyspark.sql.types import *
titanicInt=titanicS.withColumn("Sobrevivio",titanicS.Sobrevivio.cast(ByteType()))
titanicInt.printSchema()
titanicInt.distinct().show()

In [None]:
est=titanicInt.filter(titanicInt.Sexo=="Mujer").\
            agg(functions.sum(titanicInt.Sobrevivio))
est.show()

In [None]:
est=titanicInt.filter(titanicInt.Edad=="Ni�o").\
            agg(functions.sum(titanicInt.Sobrevivio))
est.show()

In [None]:
est=titanicInt.filter((titanicInt.Sexo=="Mujer") | (titanicInt.Edad=="Ni�o")).\
            agg(functions.sum(titanicInt.Sobrevivio))
est.show()

#**Ejercicios**

##**Hurto a personas en Colombia**

Se dispone de un dataset que contiene información relacionada con el hurto a personas en Colombia, son mas de 100.000 casos de hurtos cometidos en el país en la última época

Cada registro presenta la siguiente información

* Departamento	
* Municipio	
* Día 	
* Hora 	
* Zona (urbana, rural)	
* Arma empleada	
* Movil agresor	
* Movil víctima	
* Edad víctima	
* Sexo víctima

Utilizando Data Frames, resuelva a las siguientes inquietudes:

1. Top 10 de los municipios de Antioquia que presentan mayor y menor número de hurtos
2. Tipos de armas más utilizadas en zona urbana y rural
3. Promedio de edad de las víctimas por departamento
4. Tipo de vehículo más utilizado para los hurtos en los fines de semana
5. Promedio de casos de hurto por sexo para cada día de la semana
6. Municipio de Colombia que presenta mayor número de hurtos a mujeres mayores de 40 años

##**Covid 19 en Colombia**

Se dispone de un set de datos que describen la situación del covid19 en Colombia obtenidos de https://www.datos.gov.co/

Estos datos relacionan los casos positivos en Colombia a través de las siguientes variables



*   Ciudad
*   Departamento
*   Atención (casa, fallecido, hospital, hospital/uci, recuperado)
*   Edad
*   Tipo (En estudio, importado, relacionado)
*   Estado (Asintomático, Fallecido, Grave, Leve, Moderado)

Utilicemos Data Frame para solucionar las siguientes inquietudes

**1. Cuánto es el total de fallecidos y recuperados en el país**


**2.   Afectará la pandemia a hombres y  mujeres por igual**


**3.   Cuál es el promedio de edad de los afectados**


**4.   Cuál es el departamento que presenta mayor número de casos**

**5.   Qué porcentaje de los casos son importados**

**6.   Es cierto que en Cartagena la mayoría de los casos son importados**