In [2]:
import time
import json
import random

from kafka import KafkaProducer #type: ignore
def generate_sensor_data():
 return {
 "sensor_id": random.randint(1, 10),
 "temperature": round(random.uniform(20, 30), 2),
 "humidity": round(random.uniform(30, 70), 2),
 "timestamp": int(time.time())
 }
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
 value_serializer=lambda x: json.dumps(x).encode('utf-8'))
while True:
 sensor_data = generate_sensor_data()
 producer.send('sensor_data', value=sensor_data)
 print(f"Sent: {sensor_data}")
 time.sleep(1)

Sent: {'sensor_id': 8, 'temperature': 29.05, 'humidity': 54.99, 'timestamp': 1729888523}
Sent: {'sensor_id': 1, 'temperature': 24.33, 'humidity': 36.56, 'timestamp': 1729888524}
Sent: {'sensor_id': 3, 'temperature': 26.87, 'humidity': 49.13, 'timestamp': 1729888525}
Sent: {'sensor_id': 6, 'temperature': 28.82, 'humidity': 46.75, 'timestamp': 1729888526}
Sent: {'sensor_id': 8, 'temperature': 26.18, 'humidity': 69.3, 'timestamp': 1729888527}
Sent: {'sensor_id': 6, 'temperature': 26.51, 'humidity': 38.67, 'timestamp': 1729888528}
Sent: {'sensor_id': 8, 'temperature': 25.63, 'humidity': 43.02, 'timestamp': 1729888529}
Sent: {'sensor_id': 3, 'temperature': 22.6, 'humidity': 38.58, 'timestamp': 1729888530}
Sent: {'sensor_id': 5, 'temperature': 24.58, 'humidity': 55.68, 'timestamp': 1729888531}
Sent: {'sensor_id': 9, 'temperature': 23.92, 'humidity': 44.16, 'timestamp': 1729888532}
Sent: {'sensor_id': 5, 'temperature': 21.3, 'humidity': 44.88, 'timestamp': 1729888533}
Sent: {'sensor_id': 1, '

KeyboardInterrupt: 

In [4]:
import findspark #type: ignore
findspark.init()
from pyspark.sql import SparkSession #type: ignore
from pyspark.sql.functions import from_json, col, window #type: ignore
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, TimestampType #type: ignore
import logging
# Configura el nivel de log a WARN para reducir los mensajes INFO
spark = SparkSession.builder \
 .appName("RiverApps") \
 .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
 .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# Definir el esquema de los datos de entrada
schema = StructType([
 StructField("sensor_id", IntegerType()),
 StructField("temperature", FloatType()),
 StructField("humidity", FloatType()),
 StructField("timestamp", TimestampType())
])
# Crear una sesión de Spark
spark = SparkSession.builder \
 .appName("RiverApps") \
 .getOrCreate()
# Configurar el lector de streaming para leer desde Kafka
df = spark \
 .readStream \
 .format("kafka") \
 .option("kafka.bootstrap.servers", "localhost:9870") \
 .option("subscribe", "sensor_data") \
 .load()
# Parsear los datos JSON
parsed_df = df.select(from_json(col("selling_price").cast("string"), schema).alias("data")).select("data.*")
# Calcular estadísticas por ventana de tiempo
windowed_stats = parsed_df \
 .groupBy(window(col("timestamp"), "1 minute"), "sensor_id") \
 .agg({"temperature": "avg", "humidity": "avg"})
# Escribir los resultados en la consola
query = windowed_stats \
 .writeStream \
 .outputMode("complete") \
 .format("console") \
 .start()
query.awaitTermination()

In [5]:
#Importamos librerias necesarias
from pyspark.sql import SparkSession, functions as F
# Inicializa la sesión de Spark
spark = SparkSession.builder.appName('Tarea3').getOrCreate()
# Define la ruta del archivo .csv en HDFS
file_path = 'hdfs://localhost:9870/Tarea3/cardata.csv'
# Lee el archivo cardata.csv
df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load(file_path)
#imprimimos el esquema
df.printSchema()
# Muestra las primeras filas del DataFrame
df.show()
# Estadisticas básicas
df.summary().show()
# Consulta: Filtrar por Selling_Price y seleccionar columnas
print("Selling_Price con valor mayor a  2.85\n")
selling = df.filter(F.col('Selling_Price') >  2.85).select('Selling_Price')
selling.show()
# Ordenar filas por los valores en la columna "Selling_Price" en orden descendente
print("Valores ordenados de mayor a menor\n")
sorted_df = df.sort(F.col("Selling_Price").desc())
sorted_df.show()