In [6]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("kakfa")
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,"
        "org.apache.kafka:kafka-clients:3.2.0"
    )
    .getOrCreate()
)

spark

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct, col

df = spark.createDataFrame(
    [(1, "Mahesh", 1000), (2, "Rahul", 3000)],
    ["id", "name", "salary"]
)
df.show()

+---+------+------+
| id|  name|salary|
+---+------+------+
|  1|Mahesh|  1000|
|  2| Rahul|  3000|
+---+------+------+



In [8]:
kafka_df = df.select(
    col("id").cast("string").alias("key"),
    to_json(struct(*df.columns)).alias("value")
)
kafka_df.show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  1|{"id":1,"name":"M...|
|  2|{"id":2,"name":"R...|
+---+--------------------+



In [9]:
kafka_df.write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "serverName:9092") \
  .option("topic", "topic_dataframe") \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option(
      "kafka.sasl.jaas.config",
      "org.apache.kafka.common.security.plain.PlainLoginModule required "
      "username='API KEY' password='SECREATE';"
  ) \
  .save()
