# Run this example

- Loads a test file to hdfs
- Reads the file from hdfs to kafka topic using pyspark

In [None]:
! echo '========== creating folder /data in hdfs =========='
! hdfs dfs -mkdir /data
! echo '========== putting test file from local folder to hdfs =========='
! hdfs dfs -put db.csv /data/db.csv
! echo '========== listing files in folder /data =========='
! hdfs dfs -ls /data

In [None]:
! echo '========== create kafka topic =========='
! kafka-topics.sh --bootstrap-server localhost:9092 --create --topic example --replication-factor 1 --partitions 1
! echo '========== kafka topic info =========='
! kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic example
! echo '========== read kafka topic for 5 seconds =========='
! kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --timeout-ms 5000 --from-beginning

In [None]:
# execute this before starting anything with pyspark
import findspark  # read comments here: https://stackoverflow.com/questions/34998433/create-pyspark-kernel-for-jupyter
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import from_json, to_json, col, struct

spark = SparkSession.builder.appName("my_spark").getOrCreate()

# Output below should contain WARN (not ERROR). This is OK.

In [None]:
# Lets read file /data/db.csv from hdfs to spark stream
kafka_server = 'localhost:9092'
topic_name = 'example'
schema = StructType() \
    .add("column_1", StringType()) \
    .add("column_2", IntegerType())

# We will add id column with unique value to use it as key for kafka
# All columns of dataframe in json format will be values
stream = spark \
    .readStream \
    .format("csv") \
    .option("header", True) \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema) \
    .csv("/data") \
    .withColumn('id', F.concat(F.unix_timestamp(), F.rand())) \
    .withColumn('value', to_json(struct(schema.names))) \
    .selectExpr("id as key", "value as value") \
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("topic", topic_name) \
    .option("checkpointLocation", "checkpoints/stream_read_write") \
    .start()

In [None]:
! echo '========== reading kafka topic for 5 seconds =========='
! kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic example --timeout-ms 5000 --from-beginning

In [None]:
# need to stop stream
stream.stop()