# Descripción

El ejercicio 2 se basa en modificar el archivo ***demo.csv*** que contiene una tabla con información de aeropuertos en el mundo.
El cuál se basa en tres puntos diferentes:
<ol>
  <li>Generar un diccionario a partir de la columna country (country_id,country).</li>
  <li>Escribir el diccionario como CSV desde spark.</li>
  <li>Leer el CSV generado y hacer inner join con demo.csv</li>
</ol>

Las herramientas a usar son jupyter y pyspark  en su versión 2.4.5.

## Introducción

Se realiza la  importaciones de los módulos para trabajar con la lectura del archivo.

In [1]:
#modules
#spark version  2.4.5
from pyspark.sql import  SQLContext, SparkSession
from pyspark import SparkContext

Se inicializa el SparkContext, SQLContext y SparkSession con el fin de inicializar las APIs Spark y Spark SQL.

In [2]:
name = 'exercise 2'
master = 'local'


sc = SparkContext(appName= name)
sql_c = SQLContext(sc)
spark = SparkSession.builder \
    .appName(name) \
    .enableHiveSupport() \
    .master(master) \
    .getOrCreate()

Se usa el método read.format de Spark SQL para leer el archivo  ***demo.csv*** y con el método *printSchema* muestra el encabezado de dicho archivo. 

In [3]:
df = sql_c.read.format('csv').options(header='true').load('demo.csv')
df.printSchema()

root
 |-- airline_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- active: string (nullable = true)



El siguiente ar de líneas se usa con el fin de mostrar los cinco primeros registros del archivo y  el tamaño de filas y columnas, en este caso son *6,162 registros,7 atributos y el id*.

In [4]:
df.show(5,False)
print((df.count(), len(df.columns)))

+----------+--------------------------------------------+-----+----+----+--------+--------------+------+
|airline_id|name                                        |alias|iata|icao|callsign|country       |active|
+----------+--------------------------------------------+-----+----+----+--------+--------------+------+
|-1        |Unknown                                     |\N   |-   |N/A |\N      |\N            |Y     |
|1         |Private flight                              |\N   |-   |N/A |null    |null          |Y     |
|2         |135 Airways                                 |\N   |null|GNL |GENERAL |United States |N     |
|3         |1Time Airline                               |\N   |1T  |RNX |NEXTIME |South Africa  |Y     |
|4         |2 Sqn No 1 Elementary Flying Training School|\N   |null|WYT |null    |United Kingdom|N     |
+----------+--------------------------------------------+-----+----+----+--------+--------------+------+
only showing top 5 rows

(6162, 8)


## Punto 1

Se genera el diccionario a partir de la columna a partir del id que se considera como airline_id y country, para esto se mapea en forma de diccionario los datos entre los indices '0' y '6' que representan los atributos requeridos.

In [5]:
newrdd = df.rdd.map(lambda x : (x[0],x[6]))
type(newrdd)
dictionary = newrdd.collectAsMap()

Se muestra el formato que tiene el diccionario que cumple con los valores que se indican.

In [6]:
i = 0
for k, v in dictionary.items():
  if i == 5:
    break
  print(k,v)
  i+=1

-1 \N
1 None
2 United States
3 South Africa
4 United Kingdom


## Punto 2

Se Escribe el diccionario en un archivo de formato CSV.

En Spark se puede reescribir con las siguientes lienas de código, pero considera cada valor como una columna diferente.

In [7]:
rdd = sc.parallelize([dictionary])
df2 = spark.read.csv(rdd)
#df2.show(e)

Otra forma de reaizar el objetivo es generando un archvo que tiene como nombre ***test.csv*** y que se vaya escribiendo línea por línea del código.

In [8]:
with open('test.csv', 'w') as f:
    f.write("airline_id,country\n")
    for key in dictionary.keys():
        f.write("%s,%s\n"%(key,dictionary[key]))

Se muestra que el archivo ***test.csv*** se creó de manera correcta en el mismo directorio de trabajo.

In [9]:
!ls

demo.csv	  profeco.pdf.zip			   spark-warehouse
dict.csv	  qiskit_exp4_256_apertura_clausura.ipynb  test.csv
mnist_png	  quantum_clasification.ipynb		   Untitled.ipynb
mnist_png.tar.gz  quantum_classification_scale_gray.ipynb


## Punto 3

Leer el archivo ***test.csv*** generado en el punto anterior con el propósito de realizar el comando inner join con ***demo.csv***

Este punto se puede realizar de dos maneras, la primera forma  es posible al llamar el comando join e indicar la condición que se mantengan los  datos que tengan el mismo valor y se agregaran como dos columnas posteriores a *active*.

In [10]:
df2 = sql_c.read.format('csv').options(header='true').load('test.csv')
df3 = df.join(df2, df.airline_id == df2.airline_id)
df3.show(5,False)

print((df3.count(), len(df3.columns)))

+----------+--------------------------------------------+-----+----+----+--------+--------------+------+----------+--------------+
|airline_id|name                                        |alias|iata|icao|callsign|country       |active|airline_id|country       |
+----------+--------------------------------------------+-----+----+----+--------+--------------+------+----------+--------------+
|-1        |Unknown                                     |\N   |-   |N/A |\N      |\N            |Y     |-1        |\N            |
|1         |Private flight                              |\N   |-   |N/A |null    |null          |Y     |1         |None          |
|2         |135 Airways                                 |\N   |null|GNL |GENERAL |United States |N     |2         |United States |
|3         |1Time Airline                               |\N   |1T  |RNX |NEXTIME |South Africa  |Y     |3         |South Africa  |
|4         |2 Sqn No 1 Elementary Flying Training School|\N   |null|WYT |null    |U

La segunda forma es posible en indicar en la función join, sobre el atributo que los dos comparten de la forma ***on=['airline_id']*** y ***how*** indicando que sea *inner*. Este caso sólo agrega la columna country en el dataframe de ***demo.csv***.

In [11]:
df4 = df.join(df2, on=['airline_id'], how='inner')

df4.show(5, False)
print((df4.count(), len(df4.columns)))

+----------+--------------------------------------------+-----+----+----+--------+--------------+------+--------------+
|airline_id|name                                        |alias|iata|icao|callsign|country       |active|country       |
+----------+--------------------------------------------+-----+----+----+--------+--------------+------+--------------+
|-1        |Unknown                                     |\N   |-   |N/A |\N      |\N            |Y     |\N            |
|1         |Private flight                              |\N   |-   |N/A |null    |null          |Y     |None          |
|2         |135 Airways                                 |\N   |null|GNL |GENERAL |United States |N     |United States |
|3         |1Time Airline                               |\N   |1T  |RNX |NEXTIME |South Africa  |Y     |South Africa  |
|4         |2 Sqn No 1 Elementary Flying Training School|\N   |null|WYT |null    |United Kingdom|N     |United Kingdom|
+----------+----------------------------