<a href="https://colab.research.google.com/github/Anette-99/API/blob/master/Clean_DATASET(Muerte_y_Consumo).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**1.-** Primero Instalamos la paqueteria de pySpark

In [None]:
pip install pyspark

# **Muertes por sobredosis**

**2.-** Leer el csv. Crear un objeto de tipo DataFrameReader usando nuestra propiedad spark.read Creamos un objeto de lectura especificando los Headers.
Importamos SparkSession.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Muertes = spark.read.csv("Muertes_sucio.csv")
Muertes.show()

**3.-** Para poder especificar los nombres de columnas debemos de colocar el siguiente codigo.

In [None]:
Muertes = spark.read.options(header="true").csv("Muertes_sucio.csv")
Muertes.show()

**4.-** Ahora vamos a mostrar los tipos de datos que tiene el Dataset. Lo que debemos de hacer prmero es importar "pprint", lo que nos indicara que tipo de datos hay en el csv.

In [None]:
from pprint import pprint
pprint(Muertes.dtypes)

**5.-** Definimos el esquema de tipos de datos adecuado.

In [None]:
from pyspark.sql.types import *
schema = StructType([StructField("ID", IntegerType(), nullable=False), 
                     StructField("FECHA", DateType(), nullable=False), 
                     StructField("FECHA DE INFORME: 1, FECHA DE FALLECIMIENTO: 0", IntegerType(), nullable=False), 
                     StructField("EDAD", ByteType(), nullable=True), 
                     StructField("SEXO", StringType(), nullable=False), 
                     StructField("RAZA", StringType(), nullable=True), 
                     StructField("DESCRIPCI�N DE LA LESI�N", StringType(), nullable=False), 
                     StructField("CAUSA DE LA MUERTE", StringType(), nullable=False),
                     StructField("OTROS FACTORES SIGNIFICATIVO", StringType(), nullable=False), 
                     StructField("HEROINA", IntegerType(), nullable=True), 
                     StructField("COCAINA", IntegerType(), nullable=True), 
                     StructField("FENTANILO", IntegerType(), nullable=True), 
                     StructField("ETANOL", IntegerType(), nullable=True), 
                     StructField("Manera de morir", StringType(), nullable=True)])
#Muertes1 = spark.read.schema(schema).options(header="true").csv("Muertes_sucio.csv")
Muertes1 = spark.read.options(header="true").schema(schema).csv("Muertes_sucio.csv")
pprint(Muertes1.dtypes)
Muertes1.show()

**6.-** Eliminar las filas erróneas

In [None]:
Muertes1RowRemoved = spark.read.schema(schema).options(header="true", mode = "DROPMALFORMED").csv("Muertes_sucio.csv")
Muertes1RowRemoved.show()

**7.-** Este codigo va despues de nuestro esquema, para corregir y visualizar la fecha correctamente.
Podemos visualizar que de este modo estamos limpiando el Datset

In [None]:
MuertesRowsRemoved = spark.read.schema(schema).options(header="true", mode = "DROPMALFORMED", dateFormat = "dd/MM/yyyy").csv("Muertes_sucio.csv")
pprint(MuertesRowsRemoved.dtypes)
MuertesRowsRemoved.show()

**8.-** Rellenar valores vacios.

Con la palabra reservada fillna rellenamos las celdas que aparecen vacias con un valor. En mi caso tuve que copiar ne nuevo el código para rellenar otra columna diferente.

In [None]:
MN123= spark.read.schema(schema).options(header="true", dateFormat = "dd/MM/yyyy").csv("Muertes_sucio.csv")
MN123.show()
MuerteNew= MN123.fillna({ "FECHA DE INFORME: 1, FECHA DE FALLECIMIENTO: 0":0,"EDAD":23,"SEXO": "Female" ,"RAZA": "White", "OTROS FACTORES SIGNIFICATIVO": "No"})

**9.-** Visualizamos los resultados.

In [None]:
MuerteNew.show()

**10.-** Descargamos el archivo de manera que podamos volverlo a utilizar.

In [None]:
MuerteNew.write.options(header="true").csv("MuerteNew.csv")

**11.-** Utilizar condiciones para afectar valores especificos.
Aqui afectaremos la fecha:

In [None]:
MNew = spark.read.schema(schema).options(header="true", dateFormat = "dd/MM/yyyy").csv("MuerteNew.csv")
MNew.show()

**12.-** Aqui le vamos a incrementar 1 año: Utilizando el date.today significa que estamos por utilizar la fecha de hoy.

In [None]:
from pyspark.sql.functions import col, when
from datetime import date
one_year_from_now = date.today().replace(year=date.today().year + 1)
New = MNew.withColumn("FECHA", when(col("FECHA")> one_year_from_now, None).otherwise(col("FECHA")))
pprint(New.dtypes)

**13.-** Podemos visualizar que datos son los que son superiores al año actual:

In [None]:
New.show()

**14.-** Ahora eliminamos las filas que contiene valores nulos de nuestra tabla anterior.

In [None]:
MN1234remove = New.dropna()
MN1234remove.show()

**15.-** Procedemos a descargar nuestro dataset limpio:

In [None]:
MN1234remove.write.options(header="true").csv("Muerte_Limpia.csv")

------------------------------------------------------------------------

# **Consumo de drogas por edad**

**1.-** Leer el csv. Crear un objeto de tipo DataFrameReader usando nuestra propiedad spark.read Creamos un objeto de lectura especificando los Headers. Importamos SparkSession.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Consumo = spark.read.csv("Consumo_sucio.csv")
Consumo.show()

**2.-** Para poder especificar los nombres de columnas debemos de colocar el siguiente codigo.

In [None]:
Consumo = spark.read.options(header="true").csv("Consumo_sucio.csv")
Consumo.show()

**3.-** Ahora vamos a mostrar los tipos de datos que tiene el Dataset. Lo que debemos de hacer prmero es importar "pprint", lo que nos indicara que tipo de datos hay en el csv.

In [None]:
from pprint import pprint
pprint(Consumo.dtypes)

**4.-** Definimos el esquema de tipos de datos adecuado.

In [None]:
from pyspark.sql.types import *
schema = StructType([StructField("EDAD", IntegerType(), nullable=False),
                     StructField("FECHA", DateType(), nullable=True),  
                     StructField("No PERSONAS", ByteType(), nullable=False), 
                     StructField("Uso de alcohol",FloatType(), nullable=False),
                     StructField("Frecuencia de alcohol", FloatType(), nullable=False), 
                     StructField("Uso de marihuana", FloatType(), nullable=False), 
                     StructField("Frecuencia de la marihuana", FloatType(), nullable=False), 
                     StructField("Uso de coca�na", FloatType(), nullable=False), 
                     StructField("Frecuencia de coca�na", FloatType(), nullable=False),
                     StructField("Uso de grietas", FloatType(), nullable=False), 
                     StructField("Frecuencia de grietas", FloatType(), nullable=False), 
                     StructField("Uso de hero�na", FloatType(), nullable=False), 
                     StructField("Frecuencia de hero�na", FloatType(), nullable=False)])

Consumo = spark.read.options(header="true").schema(schema).csv("Consumo_sucio.csv")
pprint(Consumo.dtypes)
Consumo.show()

**5.-** Eliminar las filas erróneas

In [None]:
CONS1RowsRemoved = spark.read.schema(schema).options(header="true", mode = "DROPMALFORMED").csv("Consumo_sucio.csv")
CONS1RowsRemoved.show()

**6.-** Este codigo va despues de nuestro esquema, para corregir y visualizar la fecha correctamente. Podemos visualizar que de este modo estamos limpiando el Datset

In [None]:
CONS1RowsRemoved = spark.read.schema(schema).options(header="true", mode = "DROPMALFORMED", dateFormat = "dd/MM/yyyy").csv("Consumo_sucio.csv")
pprint(CONS1RowsRemoved.dtypes)
CONS1RowsRemoved.show()

**7.-** Rellenar valores vacios.
Con la palabra reservada **fillna** rellenamos las celdas que aparecen vacias con un valor. En mi caso tuve que copiar ne nuevo el código para rellenar otra columna diferente.

In [None]:
CN123= spark.read.schema(schema).options(header="true", dateFormat = "dd/MM/yyyy").csv("Consumo_sucio.csv")
CN123.show()
ConsumoNew= CN123.fillna({ "Uso de grietas":2.8,"Frecuencia de grietas":2.3, "Uso de hero�na":0,"Frecuencia de hero�na":3.1})

**8.-** Visualizamos los resultados.

In [None]:
ConsumoNew.show()

**9.-** Descargamos el archivo de manera que podamos volverlo a utilizar.

In [11]:
ConsumoNew.write.options(header="true").csv("ConsumoNew.csv")

**10.-** Utilizar condiciones para afectar valores especificos. Aqui afectaremos la fecha:

In [None]:
CNew = spark.read.schema(schema).options(header="true", dateFormat = "dd/MM/yyyy").csv("ConsumoNew.csv")
CNew.show()

**11.-** Aqui le vamos a incrementar 1 año: Utilizando el date.today significa que estamos por utilizar la fecha de hoy.

In [None]:
from pyspark.sql.functions import col, when 
from datetime import date
one_year_from_now = date.today().replace(year=date.today().year + 1)
New = CNew.withColumn("FECHA", when(col("FECHA")> one_year_from_now, None).otherwise(col("FECHA")))
pprint(New.dtypes)

**12.-** Visualizamos los resultados:

In [None]:
New.show()

**13.-** Aqui solo aplicamos las columnas que ocuparemos con sus datos:

In [None]:
Consumo_Limp = New.select("EDAD", "No PERSONAS", "Uso de alcohol", "Frecuencia de alcohol", "Uso de marihuana", "Frecuencia de la marihuana", "Uso de coca�na", "Frecuencia de coca�na", "Uso de grietas", "Frecuencia de grietas", "Uso de hero�na", "Frecuencia de hero�na")
pprint(Consumo_Limp.dtypes)
Consumo_Limp.show()

**14.-** Procedemos a descargar nuestra data limpia:

In [22]:
Consumo_Limp.write.options(header="true").csv("Consumo_Limpia.csv")