#### Comprobar los puertos

In [None]:
import socket

def check_port(host, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(5)
    try:
        s.connect((host, port))
        return True
    except socket.error as e:
        print(f"Connection to {host}:{port} failed: {e}")
        return False
    finally:
        s.close()

print(check_port("kafka-azure.norwayeast.cloudapp.azure.com", 29090))
print(check_port("kafka-azure.norwayeast.cloudapp.azure.com", 9091))
print(check_port("kafka-azure.norwayeast.cloudapp.azure.com", 9090))


Connection to kafka-azure.norwayeast.cloudapp.azure.com:29090 failed: timed out
False
True
True


#### Imports

In [None]:
%pip install kafka
%pip install fastavro
%pip install requests
%pip install influxdb-client
dbutils.library.restartPython()

In [None]:
%pip install --upgrade kafka-python
dbutils.library.restartPython()

In [None]:
from pyspark.sql.streaming import DataStreamWriter
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, substring, expr
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import requests
import json
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import datetime as dt
from io import BytesIO
import fastavro

#### Codigo del consumidor databricks

In [None]:
schema_url = requests.get('{}/subjects/{}/versions/latest/schema'.format("http://kafka-azure.norwayeast.cloudapp.azure.com:8081", "criptomonedas"))
schema = schema_url.text

print(schema)

{"type":"record","name":"CryptoCurrency","fields":[{"name":"nombre","type":"string"},{"name":"bid","type":"double"},{"name":"bid_size","type":"double"},{"name":"ask","type":"double"},{"name":"ask_size","type":"double"},{"name":"daily_change","type":"double"},{"name":"daily_change_percentage","type":"double"},{"name":"precio_ultimo","type":"double"},{"name":"volume","type":"double"},{"name":"precio_maximo","type":"double"},{"name":"precio_minimo","type":"double"}]}


In [None]:
#:::::::::::CONFIGURACION::::::::::::
kafka_brokers = "kafka-azure.norwayeast.cloudapp.azure.com:9092,kafka-azure.norwayeast.cloudapp.azure.com:9091,kafka-azure.norwayeast.cloudapp.azure.com:9090"
influxURL = "https://us-east-1-1.aws.cloud2.influxdata.com/"
influxToken = "TmlX0yIaYqlwgv5zwkVyMfGaTzKTJP1AvylAuEahfytsgTraUygURyXcblk4jE_GGCOIgUTKJzKT2fk48XzJFA=="
#:::::::::::ESQUEMA::::::::::::
schema_url = requests.get('{}/subjects/{}/versions/latest/schema'.format("http://kafka-azure.norwayeast.cloudapp.azure.com:8081", "criptomonedas"))
schema = schema_url.text


spark = SparkSession.builder \
    .appName("KafkaSparkConsumer") \
    .getOrCreate()

#:::::::::::KAFKA CONSUMER::::::::::::
data = (spark 
  .readStream 
  .format("kafka") 
  .option("kafka.bootstrap.servers", kafka_brokers) 
  .option("subscribe", "criptomonedas") 
  .option("startingOffsets", "earliest")
  .option("kafka.group.id", "Coins")
  .option("kafka.client.id", "Databricks")
  .option("enable.auto.commit", "True")
  .load())

# Conversion y creacion de un dataframe a partir del esquema recuperado
# 1. Quitamos los bytes sobrantes del esquema-registro Avro
# 2. Convertimos de avro a json
# 3. Sacamos el nombre a partir de la columna 'key'
# 4. Creamos una estructura tabular seleccionando el clave-valor de la columna value (el JSON)
converted_data = (data
    .withColumn('value', expr("substring(value,6)"))
    .withColumn('value', from_avro(col('value'), schema))
    .withColumn('key', col('key').cast('string'))
    .select("key","value.bid", "value.bid_size", "value.ask", "value.ask_size", "value.daily_change", "value.daily_change_percentage", "value.precio_ultimo", "value.volume", "value.precio_maximo", "value.precio_minimo", "timestamp")
    )

#display(converted_data)

# Configuración de InfluxDB
influxdb_client = InfluxDBClient(url=influxURL, token=influxToken, org="Viewnext")
bucket = "criptodata"
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)

# Función para consumir mensajes y escribir en InfluxDB
def write_to_influx(df, batch_id):
  write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
  records = df.collect()
  for mensaje in records:
      registro = (
        Point("criptomonedas")
        .tag("Moneda", mensaje['key'])
        .field("Precio", mensaje['precio_ultimo'])
        .time(mensaje['timestamp'], WritePrecision.NS)
        )
      write_api.write(bucket=bucket, org="Viewnext", record=registro)

# Escribir los datos en InfluxDB
query_influx = converted_data.writeStream.foreachBatch(write_to_influx).start()
display(query_influx)


In [None]:

# Función para consumir mensajes y escribir en InfluxDB
def write_to_influx(df):
    records = df.collect()
    for mensaje in records:
        registro = (
            Point("criptomonedas")
            .tag("Moneda", mensaje['key'])
            .field("Precio", mensaje['precio_ultimo'])
            .time(mensaje['timestamp'], WritePrecision.NS)
        )
        write_api.write(bucket=bucket, org="Viewnext", record=registro)

# Escribir los datos en InfluxDB
query_influx = data.writeStream.foreachBatch(write_to_influx).start()