# Prueba de código: Ingeniero de Datos - Orenes Core
Esta prueba está pensada para realizar la entrevista del puesto de Ingeniero de Datos. El objetivo es que este cuaderno sirva de hilo conductor en la entrevista técnica, la prueba está pensada para que puedas mostrar lo que sabes, no para descartar o acreditar candidatos. Es una herramienta a tu disposición, úsala de la forma que creas que mejor muestra tu conocimiento o tus cualidades. Por ejemplo: Si no puedes resolver un paso puedes simular la solución para mostrar otra cosa, si piensas que hay varias formas de resolver un problema o si piensas que hay algún añadido que no se pide siéntete libre de incluirlo.


La entrega consistirá en una copia de esta plantilla con los apartados completados y se revisará durante la entrevista. Esta plantilla no puede editarse, **ANTES DE EMPEZAR** puedes realizar una copia editable en Archivo/Guardar una Copia en Drive. Se valorará durante la entrevista si has llevado un control de versiones en git (usando GitHub por ejemplo).

## Entorno Spark
  Esta pieza de código te permitirá usar una interfaz de Spark en Colab para resolver tus ejercicios.

In [1]:
# Install pyspark
!pip install pyspark
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark
# Import a Spark function from library
from pyspark.sql.functions import col

Collecting pyspark
  Using cached pyspark-3.3.2.tar.gz (281.4 MB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.5
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824010 sha256=68617fc2cd11585ceeb8e4d1aa2a5fdc588da517c2ebc364466f224d9f8c04c9
  Stored in directory: /home/anavarro/.cache/pip/wheels/05/fa/3e/2e840e7c1bb33325381b6b3b0e55a1c9c3e0485d2ca469229d
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
23/03/14 15:22:38 WARN 

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/14 15:22:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Paso 1
Carga los datos del fichero housing_train.csv que guardado en la carpeta sample_data en un data frame de Spark.

Se hace uso del argumento "header", ya que los datos vienen con cabeceras, además se usa "inferSchema" ya que permite que a cada columna, se le asigne el mejor tipo de dato.

In [12]:
df = spark.read.csv("sample_data/housing_train.csv", header=True, inferSchema=True)
df.columns
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

## Paso 2
Crea otro data frame a apartir de un muestreo aleatorio del 20% del data frame creado. Después crea otro data frames con el resto de registros. Comprueba que al unir los dos data frames creados no existen pérdidas ni duplicados de registors respecto al obtenido en el paso anterior.

Se extrae el 20% de los datos haciendo uso de la función "[sample](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.sample.html)", que junto a su argumento "fraction" permite escoger el porcentaje de datos que se quiere extraer de forma ***aleatoria***, sin embargo para poder reproducir el ejercicio de forma exacta, se añade el argumento "seed".

In [None]:
muestra_df = df.sample(fraction=0.2, seed=1)

Para obtener un dataframe con el resto de datos, se hace usode la función "[exceptAll](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.exceptAll.html)".

In [None]:
resto_df = df.exceptAll(muestra_df)

A continuación se unen ambos df, haciendo uso de la función "[union](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html)", la cual permite realizar un merge entre dos dataframe.

In [None]:
muestra_and_resto = muestra_df.union(resto_df)

Finalmente, para comprobar que no se ha producido ninguna pérdida durante el proceso, se comprueba la longitud tanto del dataframe inicial, como el resultado tras unir ambos dataframe. Además puede darse el caso de que durante el proceso se duplique una fila y se pierda otra, nos encontrariamos en el caso en el que la primera comprobación indicaría que todo esta bien, ya que tienen el mismo número de fila, pero esto puede deverse a que tenemos la duplicada. Es por ello que con la función "[distinct](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.distinct.html)", evitamos que esto ocurra.

In [None]:
if muestra_and_resto.count() == df.count() and muestra_and_resto.distinct().count() == df.distinct().count():
    print("No hay pérdidas ni duplicados al unir los DataFrames")
else:
    print("Se han perdido o duplicado registros al unir los DataFrames")

## Paso 3
Realiza un histograma de cada uno de los campos y compara los dos data frames creados en el paso anterior.

## Paso 4
Partiendo del data frame del primer paso. Obtén el registro que más al norte esté según el número de habitaciones que tenga. Es decir, el registro más al norte entre todos los registros con una sola habitación, el más al norte entre todos los registros de dos habitaciones, y así sucesivamente. 

## Paso 5
Partiendo de los dos data frames del Paso 2. Calcula la media de las latitudes de los cada data frame agregado por el el número de dormitorios. Calcula las difrencias entre las medias de cada data frame para el mismo número de dormitorios. Es decir, la diferencia entre la latitud media de los registros del data frame del 20% con un solo dormitorio y la latitud media del data frame del 80% con un solo dormitorio; Realiza esta operación para todo número de habitaciones. 

## Paso 6
Calcula cualquier medida que consideres interesante en utilizando Spark SQL. Por ejemplo, la media de dormitorios agrupado por las habitaciones que tiene cada registro.