# Ejemplo Kafka 2
1. Creamos la SparkSession y creamos el *DataFrame* inicial leyendo datos del topic *facturas* de kafka.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import os
import pyspark

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = pyspark.__version__

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.1'
]

args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
    args = f'--packages {",".join(packages)}'
    print('Using packages', packages)
    os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
else:
    print(f'Found existing args: {args}') 

spark = SparkSession.builder\
  .master("spark://spark-master:7077") \
  .appName("kafka-example-2")\
  .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
  .config("spark.eventLog.enabled", "true") \
  .config("spark.eventLog.dir", "hdfs:///spark/logs/history") \
  .config("spark.history.fs.logDirectory", "hdfs:///spark/logs/history") \
  .getOrCreate()

Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5', 'org.apache.kafka:kafka-clients:3.5.1']
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-76a2ebc9-f3ed-4db7-86ca-6b9d26bc19fd;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.5 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.5 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.kafka#kafka-clients;3.5.1 in central
	found com.github.luben#zstd-jni;1.5.5-1 in centra

2. Creamos el *DataFrame* inicial leyendo datos del topic *facturas* de kafka.

In [2]:
kafkaDFS = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-1:9092") \
    .option("subscribe", "facturas") \
    .option("startingOffsets", "earliest") \
    .load()

3. Creamos el *esquema* para nuestro streaming DataFrame.

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
esquema = StructType([
    StructField("InvoiceNumber", StringType()),
    StructField("CreatedTime", LongType()),
    StructField("StoreID", StringType()),
    StructField("PosID", StringType()),
    StructField("CashierID", StringType()),
    StructField("CustomerType", StringType()),
    StructField("CustomerCardNo", StringType()),
    StructField("TotalAmount", DoubleType()),
    StructField("NumberOfItems", IntegerType()),
    StructField("PaymentMethod", StringType()),
    StructField("CGST", DoubleType()),
    StructField("SGST", DoubleType()),
    StructField("CESS", DoubleType()),
    StructField("DeliveryType", StringType()),
    StructField("DeliveryAddress", StructType([
        StructField("AddressLine", StringType()),
        StructField("City", StringType()),
        StructField("State", StringType()),
        StructField("PinCode", StringType()),
        StructField("ContactNumber", StringType())
    ])),
    StructField("InvoiceLineItems", ArrayType(StructType([
        StructField("ItemCode", StringType()),
        StructField("ItemDescription", StringType()),
        StructField("ItemPrice", DoubleType()),
        StructField("ItemQty", IntegerType()),
        StructField("TotalValue", DoubleType())
    ]))),
])

4. Realizamos varias transformaciones, reduciendo el número de campos y obtenemos un nuevo DataFrame más limpio, sin subcampos.

In [4]:
from pyspark.sql.functions import from_json, col
valueDF = kafkaDFS.select(from_json(col("value").cast("string"), esquema).alias("value"))
valueDF.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- InvoiceNumber: string (nullable = true)
 |    |-- CreatedTime: long (nullable = true)
 |    |-- StoreID: string (nullable = true)
 |    |-- PosID: string (nullable = true)
 |    |-- CashierID: string (nullable = true)
 |    |-- CustomerType: string (nullable = true)
 |    |-- CustomerCardNo: string (nullable = true)
 |    |-- TotalAmount: double (nullable = true)
 |    |-- NumberOfItems: integer (nullable = true)
 |    |-- PaymentMethod: string (nullable = true)
 |    |-- CGST: double (nullable = true)
 |    |-- SGST: double (nullable = true)
 |    |-- CESS: double (nullable = true)
 |    |-- DeliveryType: string (nullable = true)
 |    |-- DeliveryAddress: struct (nullable = true)
 |    |    |-- AddressLine: string (nullable = true)
 |    |    |-- City: string (nullable = true)
 |    |    |-- State: string (nullable = true)
 |    |    |-- PinCode: string (nullable = true)
 |    |    |-- ContactNumber: string (nullable = true)
 |    |

In [5]:
from pyspark.sql.functions import expr

explodeDF = valueDF.selectExpr("value.InvoiceNumber", "value.CreatedTime",
    "value.StoreID", "value.PosID", "value.CustomerType",
    "value.PaymentMethod", "value.DeliveryType", "value.DeliveryAddress.City",
    "value.DeliveryAddress.State", "value.DeliveryAddress.PinCode",
    "explode(value.InvoiceLineItems) as LineItem")

limpioDF = explodeDF \
    .withColumn("ItemCode", expr("LineItem.ItemCode")) \
    .withColumn("ItemDescription", expr("LineItem.ItemDescription")) \
    .withColumn("ItemPrice", expr("LineItem.ItemPrice")) \
    .withColumn("ItemQty", expr("LineItem.ItemQty")) \
    .withColumn("TotalValue", expr("LineItem.TotalValue")) \
    .drop("LineItem")

5. Enviamos los resultados a un *sink*:
   - a) A un fichero *json*.

In [None]:
facturaWriterQuery  = limpioDF.writeStream \
    .format("json") \
    .queryName("Facturas Kafka Writer") \
    .outputMode("append") \
    .option("path", "/user/jovyan/salida_kafka_22") \
    .option("checkpointLocation", "file:///home/jovyan/chk-point-dir-caso4") \
    .trigger(processingTime="1 minute") \
    .start()

facturaWriterQuery.awaitTermination()

   - b) A kafka

In [6]:
from pyspark.sql.functions import to_json, struct
kafkaTargetDF = limpioDF.select(
    limpioDF["InvoiceNumber"].alias("key"),
    to_json(struct(*[limpioDF[c] for c in limpioDF.columns])).alias("value")
)

#df_json = limpioDF.select(to_json(struct([limpioDF[x] for x in limpioDF.columns])).alias("value"))
facturaWriterQuery = kafkaTargetDF.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-1:9092") \
    .option("topic", "facturas_salida") \
    .option("checkpointLocation", "file:///home/jovyan/kafka-facturas-checkpoint152") \
    .outputMode("append") \
    .start()

25/04/09 18:19:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/09 18:19:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/04/09 18:20:34 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAll