In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
spark = SparkSession.builder \
    .appName("KafkaProducer") \
    .getOrCreate()


In [3]:
schema = StructType([
    StructField("job_ID", StringType(), True),
    StructField("designation", StringType(), True),
    StructField("company_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("work_type", StringType(), True),
    StructField("involvement", StringType(), True),
    StructField("employees_count", IntegerType(), True),
    StructField("total_applicants", IntegerType(), True),
    StructField("linkedin_followers", IntegerType(), True),
    StructField("job_details", StringType(), True),
    StructField("details_id", IntegerType(), True),
    StructField("industry", StringType(), True),
    StructField("level", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True)
])
df = spark.read.csv("job_cleanData.csv", header=True, schema=schema)

In [None]:
df.selectExpr("CAST(job_ID AS STRING) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "testing") \
    .save()

In [4]:
spark = SparkSession.builder \
    .appName("KafkaConsumer") \
    .getOrCreate()

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "testing") \
    .load()

In [None]:
df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

In [None]:
query = df \
    .writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .option("path", "output") \
    .start()