# Prueba 1 - Big Data

## Francisca Pinto - Francisco Silva

## 29 de enero de 2022

Se parte por importar las librerías necesarias para el desarrollo del problema.

In [15]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.sql.functions import col, date_format

1. Se importan las librerías <code>SparkConf</code>, <code>SparkContext</code> y <code>SQLContext</code> para realizar la prueba.
2. El pyspark.SparkContext que por defecto es denominado <code>sc</code> ya está configurado.

In [3]:
sc

<SparkContext master=yarn appName=livy-session-3>

Se importa el archivo con formato <code>.parquet</code> desde el bucket correspondiente con el método <code>.read.parquet</code> y luego se muestra el esquema de los datos.

In [4]:
df_transantiago = spark.read.parquet('s3://bigdata-desafio/transantiago')

In [5]:
df_transantiago.printSchema()

root
 |-- fechahoratrx: timestamp (nullable = true)
 |-- codigoentidad: long (nullable = true)
 |-- nombreentidad: string (nullable = true)
 |-- codigositio: long (nullable = true)
 |-- nombresitio: string (nullable = true)
 |-- nrotarjeta: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- from: integer (nullable = true)
 |-- to: integer (nullable = true)

### Ejercicio 1 - Operaciones básicas

1. Determinar la cantidad de transacciones.
2. Mostrar cuál es la primera y la última transacción según la columna <code>fechahoratrx</code>.

La cantidad de transacciones es igual a la cantidad de filas según el diccionario entregado, por lo que puede usarse el método <code>.count()</code>.

In [6]:
df_transantiago.count()

456532128

La cantidad de filas es de **<code>456.532.128</code>**.

Respecto a la segunda consulta, la primera y la última transacción pueden obtenerse con el método <code>.head()</code> modificando el orden de los datos con <code>orderBy</code>.

In [7]:
df_transantiago.orderBy(df_transantiago.fechahoratrx, ascending = True).head(1)

[Row(fechahoratrx=datetime.datetime(2018, 4, 16, 19, 4, 59), codigoentidad=2, nombreentidad=u'U2 - Su Bus', codigositio=14008, nombresitio=u'BJFD-95', nrotarjeta=u'5a00a171594e87599006deff486476b2a822b36eabb5af26fc79c732ef386c69', day=20180819, from=20180811, to=20180813)]

In [8]:
df_transantiago.orderBy(df_transantiago.fechahoratrx, ascending = False).head(1)

[Row(fechahoratrx=datetime.datetime(2018, 9, 3, 22, 55, 22), codigoentidad=15, nombreentidad=u'U1 - Alsacia', codigositio=13942, nombresitio=u'BJFD-13', nrotarjeta=u'12dbec71fb4d0dde9e322db14d98cf0ee569ce1b25c6d763c0f4a1e26c965265', day=20180907, from=20180903, to=20180905)]

### Ejercicio 2 - Operadores del servicio

1. Mostrar los distintos operadores.
2. Cantidad de transacciones de cada operador.

Según el diccionario entregado, los distintos operadores están listados en la columna <code>nombreentidad</code>.

In [9]:
df_transantiago.select('nombreentidad').distinct().show()

+------------------+
|     nombreentidad|
+------------------+
|       U2 - Su Bus|
|          U7 - STP|
|        METRO - OT|
|      Tren Central|
|      U1 - Alsacia|
|       U6 - Redbus|
|         U3 - Vule|
|      U4 - Express|
|U5 - Metropolitana|
+------------------+

Respecto a la cantidad de transacciones por operador, puede agruparse el total de transacciones utilizando el método <code>groupBy</code> sumando las filas en cada caso.

In [10]:
df_transantiago.select('nombreentidad').groupBy('nombreentidad').agg({'nombreentidad' : 'count'}).show()

+------------------+--------------------+
|     nombreentidad|count(nombreentidad)|
+------------------+--------------------+
|       U2 - Su Bus|            39337668|
|          U7 - STP|            17825212|
|        METRO - OT|           202471859|
|      Tren Central|             5659576|
|      U1 - Alsacia|            27317884|
|       U6 - Redbus|            26101394|
|         U3 - Vule|            48198386|
|      U4 - Express|            42893078|
|U5 - Metropolitana|            46727071|
+------------------+--------------------+

### Ejercicio 3 - Tarjetas Bip!

1. Mostrar las 20 tarjetas con más transacciones.
2. Contar las tarjetas que tengan más de dos transacciones por día.

La variable <code>nrotarjeta</code> muestra el código de cada tarjeta, por lo que se agruparán las transacciones según esta variable. Posteriormente la tabla se ordenará según ese conteo, de mayor a menor, y se mostrarán los 20 primeros resultados.

In [12]:
df_transantiago.select('nrotarjeta').groupBy('nrotarjeta').agg({'nrotarjeta' : 'count'}).orderBy('count(nrotarjeta)', ascending = False).show(20)

+--------------------+-----------------+
|          nrotarjeta|count(nrotarjeta)|
+--------------------+-----------------+
|deec3122b409a5fc5...|             1411|
|de9c88570da017d4a...|             1263|
|3067b87bcbd4539f6...|             1194|
|469ea593723b604d5...|             1163|
|803c9ddf9d09d177f...|             1153|
|fa3875d0e0cc7ccd5...|             1153|
|9eeec446133e409fa...|             1142|
|b34fa35f11aa675f1...|             1139|
|96d6b2eb577ab2629...|             1139|
|06cfe644a7b163bec...|             1131|
|7ca23ee541b387e96...|             1125|
|3ed8575b3d1d5281f...|             1118|
|a5c1d005449309320...|             1109|
|1760b3b06c2d4d718...|             1108|
|8d09ad5982ef800f7...|             1102|
|15af9115719b3e3c4...|             1086|
|46313191acb7ab2ec...|             1082|
|773f1ec9877777b91...|             1067|
|64eb75d0b2396ab30...|             1064|
|94f04c2250766d9ff...|             1061|
+--------------------+-----------------+
only showing top

Las tarjetas con más transacciones en el dataset presentan una cantidad entre <code>1400</code> y <code>1060</code> aproximadamente.

Respecto al segundo punto, se debe usar el campo <code>fechahoratrx</code> ya que está en formato <code>timestamp</code>, extraer la hora y generar un nuevo campo solo con el día.

In [23]:
df_transantiago = df_transantiago.withColumn('diatrx', date_format(col('fechahoratrx'), 'dd/MM/yyyy'))

Se solicitarán algunos valores para saber si el método se ejecutó correctamente:

In [24]:
df_transantiago.show(4)

+-------------------+-------------+-------------+-----------+-----------+--------------------+--------+--------+--------+----------+
|       fechahoratrx|codigoentidad|nombreentidad|codigositio|nombresitio|          nrotarjeta|     day|    from|      to|    diatrx|
+-------------------+-------------+-------------+-----------+-----------+--------------------+--------+--------+--------+----------+
|2018-08-08 06:17:00|            4| U4 - Express|       5517|    ZN-6156|10699ec735e3a7ef1...|20180903|20180827|20180829|08/08/2018|
|2018-08-08 06:17:03|            4| U4 - Express|       5517|    ZN-6156|43ce6a265f01a62f1...|20180903|20180827|20180829|08/08/2018|
|2018-08-08 06:17:40|            4| U4 - Express|       5517|    ZN-6156|b5476ff3fd038b53d...|20180903|20180827|20180829|08/08/2018|
|2018-08-08 06:17:44|            4| U4 - Express|       5517|    ZN-6156|c79b63a8c7112ea5f...|20180903|20180827|20180829|08/08/2018|
+-------------------+-------------+-------------+-----------+--------

Con el nuevo campo ya creado se pueden utilizar los métodos <code>groupBy</code> y <code>.agg</code> para generar un nuevo dataframe que posteriormente se filtra según la cantidad de transacciones contadas.

In [25]:
df_transantiago_aux = df_transantiago.select('nrotarjeta', 'diatrx').groupBy('nrotarjeta', 'diatrx').agg({'diatrx' : 'count'})

df_transantiago_aux.filter(df_transantiago_aux['count(diatrx)'] > 2).select('nrotarjeta').distinct().count()

5353814

<code>5.353.814</code> tarjetas tienen más de dos transacciones en un día.

### Ejercicio 4 - Fechas y rangos horarios

1. Determinar cuáles son los 10 días con más transacciones.
2. Graficar la cantidad de transacciones por hora, para cada operador, con el eje X como hora, y el eje Y como número de transacciones.

Para la primera parte se trabaja agrupando las filas del dataframe en función de <code>diatrx</code>.

In [26]:
df_transantiago.select('diatrx').groupBy('diatrx').agg({'diatrx' : 'count'}).orderBy('count(diatrx)', ascending = False).show(10)

+----------+-------------+
|    diatrx|count(diatrx)|
+----------+-------------+
|24/08/2018|     35500762|
|28/08/2018|     34907307|
|27/08/2018|     34875930|
|29/08/2018|     31530669|
|23/08/2018|     30946604|
|22/08/2018|     26971860|
|30/08/2018|     25727297|
|16/08/2018|     23844751|
|21/08/2018|     23675505|
|20/08/2018|     23649525|
+----------+-------------+
only showing top 10 rows

Para la segunda parte se realiza lo siguiente:

1. Se crea una columna filtrando la columna <code>fechahoratrx</code> dejando solo la hora.
2. Se crea dataframe auxiliar filtrando transacciones por hora.
3. Instalación en la instancia de librerías <code>matplotlib</code> y <code>seaborn</code>.
4. Creación en <code>local</code> (con query tipo <code>SQL</code>) de tabla con resultados de agrupación por hora y por operador del total de transacciones.
5. Creación en <code>local</code> de gráfico de resultados.

In [27]:
df_transantiago = df_transantiago.withColumn('horatrx', date_format(col('fechahoratrx'), 'dd/MM/yyyy HH'))

In [30]:
df_transantiago_hours = df_transantiago.select('nombreentidad', 'horatrx')\
                                       .groupBy('nombreentidad', 'horatrx')\
                                       .agg({'horatrx' : 'count'})

df_transantiago_hours.show()

+------------------+-------------+--------------+
|     nombreentidad|      horatrx|count(horatrx)|
+------------------+-------------+--------------+
|      U4 - Express|20/08/2018 08|        203015|
|       U6 - Redbus|23/08/2018 19|        120880|
|       U2 - Su Bus|25/08/2018 04|          1770|
|      U4 - Express|26/08/2018 08|         43284|
|      U1 - Alsacia|27/08/2018 14|         96084|
|      U4 - Express|10/08/2018 11|         26655|
|U5 - Metropolitana|11/08/2018 12|         22702|
|U5 - Metropolitana|12/08/2018 23|          5614|
|       U2 - Su Bus|16/08/2018 04|          1884|
|         U3 - Vule|16/08/2018 05|         39439|
|         U3 - Vule|07/08/2018 11|            17|
|          U7 - STP|30/08/2018 01|           130|
|       U2 - Su Bus|23/08/2018 02|           487|
|U5 - Metropolitana|23/08/2018 05|         29087|
|      Tren Central|31/08/2018 06|         10332|
|       U2 - Su Bus|31/08/2018 23|         18565|
|      U4 - Express|19/08/2018 13|         44980|


In [31]:
df_transantiago_hours.registerTempTable('df_transantiago_hours1')