# Consumidor

In [0]:
# Leemos el flujo de datos a tráves de Spark Streaming, cargamos los datos con Spark.readStream y con option indicamos el servidor y el topic
vuelos = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", "proyecto") \
    .load()

vuelos.printSchema()

#Leemos los binarios transformandolos a un string
#Y Generamos un timestamp para cada registro pues ocupamos saber en que momento estamos recibiendo el dato
vuelos_string = vuelos.selectExpr("CAST(value AS STRING)", "timestamp")

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

# Generamos un esquema para transformar los datos del JSON a una columna de DataFrame para poder manipularlo
schema_string = "flight STRING,lat DOUBLE,lon DOUBLE, status STRING, from STRING"

# Seleccionamos los datos y agregamos la hora de lectura
df2 = vuelos_string \
        .select(from_csv(col("value"), schema_string) \
                .alias("flight"), "timestamp")


# Seleccionamos todos los registros con sus respectivas estampas de tiempo
df3 = df2.select("flight.*", "timestamp")

In [0]:
#Generamos una vista temporal del DataFrame
df3.createOrReplaceTempView("flights");

# Seleccionammos todo como demostración
all_flight = spark.sql("SELECT * FROM flights")

# Generamos un stream que guarda el contenido del DataFrame de transmisión externamente
write_stream = all_flight \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

#Espera hasta que la consulta se termine
write_stream.awaitTermination(1)

Out[23]: False

In [0]:
# Obtenemos una consulta estaática de nuestro flujo.
df = spark.sql("SELECT * FROM testedTable")
df.show(5)

+-------+-------+---------+------+----+--------------------+
| flight|    lat|      lon|status|from|           timestamp|
+-------+-------+---------+------+----+--------------------+
|AMX004D|34.4248| -83.6065|     A| CDG|2022-05-19 16:18:...|
| ANA180|36.3623|-121.3503|     A| NRT|2022-05-19 16:18:...|
|CLX6684|  42.11| -83.4187|     A| LUX|2022-05-19 16:18:...|
| AMX002|44.7812| -74.0644|     A| MAD|2022-05-19 16:18:...|
| AFR178|52.2303| -69.6082|     A| CDG|2022-05-19 16:18:...|
+-------+-------+---------+------+----+--------------------+
only showing top 5 rows



## BOT TWITTER

Vamos a generar un BOT mediante la API de Twitter que publique cada que un vuelo arrive al Aeropuerto Internacional de la Ciudad de México. Si gustan obervarlo en tiempo real la cuenta es : **@Crushdumb**

*¡Advertencia!: Esta no es una cuenta educativa, no llegué más allá de los tweets de vuelos. Y no siga la cuenta a menos que quiera estar al tanto de cuando Daniel llora*

Por otro lado, son bienvenidos a seguir a cualquier participante de este equipo en instagram. @Datamaniax patente pendiente.

In [0]:
# Nuevamente generamos una selección de datos junto con el timepo de lectura
df2 = vuelos_string \
         .select(from_csv(col("value"), schema_string) \
         .alias("flight"), "timestamp")

# Filtramos de forma que solo recibamos vuelos en estatus de L: Landing
df3 = df2.select("flight.*", "timestamp").filter(col('status').contains('L'))


# Nuestra vista temporal
df3.createOrReplaceTempView("flights");

#Consultamos todos los datos
find_text = spark.sql("SELECT * FROM flights")

# Generamos un stream
status_stream = find_text \
            .writeStream \
            .trigger(processingTime='5 seconds') \
            .outputMode("append") \
            .option("truncate", "false") \
            .format("memory") \
            .queryName("yavanaaterrizar") \
            .start()

    
status_stream.awaitTermination(1)

Out[25]: False

In [0]:
# Visualizamos mediante una consulta de nuestra tabla creada:
status = spark.sql("SELECT * FROM yavanaaterrizar")
status.show(20)

+-------+-------+--------+------+----+--------------------+
| flight|    lat|     lon|status|from|           timestamp|
+-------+-------+--------+------+----+--------------------+
|VIV1015|19.4267|-99.2431|     L| CUN|2022-05-19 16:19:...|
| TAO141|19.3901|-99.1389|     L| TPQ|2022-05-19 16:19:...|
|VIV1015|19.4267|-99.2431|     L| CUN|2022-05-19 16:20:...|
| TAO141|19.3901|-99.1389|     L| TPQ|2022-05-19 16:20:...|
|VIV1015|19.3887|-99.2094|     L| CUN|2022-05-19 16:20:...|
| TAO141|19.4205|-99.0882|     L| TPQ|2022-05-19 16:20:...|
+-------+-------+--------+------+----+--------------------+



### Conexión.
Vamos a geenrera la conexión con la API de Twitter.

In [0]:
!pip install tweepy

Collecting tweepy
  Downloading tweepy-4.9.0-py3-none-any.whl (77 kB)
[?25l[K     |████▏                           | 10 kB 25.7 MB/s eta 0:00:01[K     |████████▍                       | 20 kB 24.3 MB/s eta 0:00:01[K     |████████████▋                   | 30 kB 12.5 MB/s eta 0:00:01[K     |████████████████▉               | 40 kB 10.0 MB/s eta 0:00:01[K     |█████████████████████           | 51 kB 7.2 MB/s eta 0:00:01[K     |█████████████████████████▏      | 61 kB 8.4 MB/s eta 0:00:01[K     |█████████████████████████████▍  | 71 kB 7.7 MB/s eta 0:00:01[K     |████████████████████████████████| 77 kB 4.8 MB/s 
[?25hCollecting requests<3,>=2.27.0
  Downloading requests-2.27.1-py2.py3-none-any.whl (63 kB)
[?25l[K     |█████▏                          | 10 kB 23.3 MB/s eta 0:00:01[K     |██████████▍                     | 20 kB 26.6 MB/s eta 0:00:01[K     |███████████████▋                | 30 kB 30.7 MB/s eta 0:00:01[K     |████████████████████▊           | 40 kB 

In [0]:
# Definimos y generamos la autentificación con la cuenta:
import tweepy

api_key = 'z2jU69J7IQo4mLiHw2A2t0gth'
api_key_secret = 'LzITB2VBYuw9lmHy62eEhkNcgE0LRRhRJJU04FwkT7besgYhKM'
token = '2800552213-v73BFRA1m7zJOLKws1wbU0h4wLdrAGNjVTHhTBO'
token_secret = 'NfKa7BsfhIZ7UD4lKwqXrGldPHQ2AinHrGMQu3vSQOw6P'

# Autenticamos nuestras llaves
authenticator = tweepy.OAuthHandler(api_key, api_key_secret)
authenticator.set_access_token(token, token_secret)

# Generamos la conexión
api = tweepy.API(authenticator)

### Filtro de Bloom

Esta implementación pide al usuario el número máximo de elementos que estima almacenar en el filtro de Bloom, así como la probabilidad del error de los falsos positivos. 

Define a k, el número de funciones hash como:
$$k = \frac{m}{n}\ln{(2)}$$

Ya que ese valor es el que minimiza la probabilidad de los falsos positivos.

El valor que usa para m, el número de bits es:
$$m = -\frac{n \ln{(ϵ)}}{(\ln{(2)})^2}$$

Puesto que es valor resultante al despejar $m$ de la siguiente expresión y sustituir el valor óptimo para $k$

$$\epsilon ≈ (1 - e^{\frac{-kn}{m}})^k$$

Vamos a hacer uso de una biblioteca que ya tiene implementado el filtro de bloom para nuestra aplicación.

In [0]:
!pip install bloom_filter2

Collecting bloom_filter2
  Downloading bloom-filter2-2.0.0-1.tar.gz (6.6 kB)
  Downloading bloom_filter2-2.0.0-py3-none-any.whl (6.8 kB)
Installing collected packages: bloom-filter2
Successfully installed bloom-filter2-2.0.0
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
# Definimos un filtro de de Bloom para fltrar los vuelos repetidos.
from bloom_filter2 import BloomFilter

bloom = BloomFilter(max_elements=2000, error_rate=0.05)

In [0]:
!pip install -U airportsdata

Collecting airportsdata
  Downloading airportsdata-20220518-py3-none-any.whl (1.0 MB)
[?25l[K     |▎                               | 10 kB 15.6 MB/s eta 0:00:01[K     |▋                               | 20 kB 12.5 MB/s eta 0:00:01[K     |█                               | 30 kB 10.0 MB/s eta 0:00:01[K     |█▎                              | 40 kB 9.0 MB/s eta 0:00:01[K     |█▋                              | 51 kB 6.7 MB/s eta 0:00:01[K     |██                              | 61 kB 7.7 MB/s eta 0:00:01[K     |██▎                             | 71 kB 7.1 MB/s eta 0:00:01[K     |██▋                             | 81 kB 7.2 MB/s eta 0:00:01[K     |███                             | 92 kB 7.5 MB/s eta 0:00:01[K     |███▎                            | 102 kB 7.5 MB/s eta 0:00:01[K     |███▋                            | 112 kB 7.5 MB/s eta 0:00:01[K     |███▉                            | 122 kB 7.5 MB/s eta 0:00:01[K     |████▏                           | 133 kB 7.5 MB/s 

In [0]:
!pip install emoji-country-flag

Collecting emoji-country-flag
  Downloading emoji_country_flag-1.3.0-py2.py3-none-any.whl (8.0 kB)
Installing collected packages: emoji-country-flag
Successfully installed emoji-country-flag-1.3.0
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


Generamos una función que permite publicar un tuit cada 30 segundos medinate un ciclo While.

In [0]:
import time
import airportsdata
import flag

t = time.time()
# Definimos la funció para la obtención del código del aeropuerto.
airports = airportsdata.load('IATA')  # key is IATA code

while True:
    status = spark.sql("SELECT * FROM yavanaaterrizar")
    for vuelo in status.rdd.collect():
        if vuelo['flight'] not in bloom:
            air = airports[vuelo['from']]
            bloom.add(vuelo['flight'])
            api.update_status('✈️ El vuelo ' + vuelo['flight'] + ' de ' + air['city']+ ', ' + air['country'] + ' (' + vuelo['from'] + ') '  + flag.flag(air['country']) + ' a La Ciudad de Mexico, Mexico (MEX) 🇲🇽 está ARRIVANDO 🛬 ' + str(vuelo['timestamp']))
    time.sleep(30)
    if (time.time() - t) > 900:
       break