In [1]:
import findspark 
findspark.init()

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
from pyspark.sql.functions import col

In [3]:
scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = '3.3.2'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.3.1'
]

In [4]:
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()

In [6]:
df = spark.read.csv('data_test.csv',header=True)

In [7]:
df.show()

+---+----------+--------------------+----------------+--------------------+---------+----------+-----+------------------------+---------------------+----------+--------+-----------+------------+------+-------+-------+------+----------+--------------+--------------+-----------+-------+---------+--------+--------+----------------+----------+
| id|created_by|        created_date|last_modified_by|  last_modified_date|is_active|     dates|hours|disqualified_application|qualified_application|company_id|group_id|campaign_id|publisher_id|job_id|user_id|bid_set|clicks|spend_hour|invalid_clicks|expired_clicks|impressions|feed_id|  sources|est_time|est_date|est_created_date|conversion|
+---+----------+--------------------+----------------+--------------------+---------+----------+-----+------------------------+---------------------+----------+--------+-----------+------------+------+-------+-------+------+----------+--------------+--------------+-----------+-------+---------+--------+--------+---

In [9]:
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value").show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  1|{"id":"1","create...|
|  2|{"id":"2","create...|
|  3|{"id":"3","create...|
|  4|{"id":"4","create...|
|  5|{"id":"5","create...|
|  6|{"id":"6","create...|
|  7|{"id":"7","create...|
|  8|{"id":"8","create...|
|  9|{"id":"9","create...|
| 10|{"id":"10","creat...|
| 11|{"id":"11","creat...|
+---+--------------------+



In [10]:
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
    .write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "test1").save()

In [11]:
df_read = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test1") \
  .load()

In [12]:
df_read.show()

+-------+--------------------+-----+---------+------+--------------------+-------------+
|    key|               value|topic|partition|offset|           timestamp|timestampType|
+-------+--------------------+-----+---------+------+--------------------+-------------+
|   null|    [68 65 6C 6C 6F]|test1|        0|     0|2023-07-05 21:10:...|            0|
|   null|[74 68 69 73 20 6...|test1|        0|     1|2023-07-05 21:10:...|            0|
|   null|[74 65 73 74 20 6...|test1|        0|     2|2023-07-05 21:12:...|            0|
|   null| [68 61 68 61 68 61]|test1|        0|     3|2023-07-05 21:12:...|            0|
|   null|       [68 65 68 65]|test1|        0|     4|2023-07-05 21:12:...|            0|
|   null|       [61 62 63 20]|test1|        0|     5|2023-07-05 21:12:...|            0|
|   null|          [78 79 7A]|test1|        0|     6|2023-07-05 21:12:...|            0|
|   null|          [31 32 33]|test1|        0|     7|2023-07-05 21:12:...|            0|
|   [31]|[7B 22 69 64

In [13]:
final_data = df_read.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [14]:
result = final_data.select('value').filter(final_data.key.isNotNull())

In [15]:
result.show()

+--------------------+
|               value|
+--------------------+
|{"id":"1","create...|
|{"id":"2","create...|
|{"id":"3","create...|
|{"id":"4","create...|
|{"id":"5","create...|
|{"id":"6","create...|
|{"id":"7","create...|
|{"id":"8","create...|
|{"id":"9","create...|
|{"id":"10","creat...|
|{"id":"11","creat...|
+--------------------+



In [16]:
result.take(1)

[Row(value='{"id":"1","created_date":"2022-10-21 09:24:25.438490","last_modified_date":"2022-10-21 09:24:25.438490","is_active":"1","dates":"2022-10-21","hours":"6","company_id":"349","group_id":"160","campaign_id":"446","publisher_id":"121","job_id":"11039","bid_set":"0.0000","clicks":"1","spend_hour":"0.0000","feed_id":"108","sources":"Cassandra"}')]

In [17]:
columns = StructType([StructField('id',
                                      StringType(), True),
                          StructField('created_date',
                                      StringType(), True),
                          StructField('last_modified_date',
                                      StringType(), True),
                          StructField('is_active',
                                      StringType(), True),
                          StructField('dates',
                                      StringType(), True),
                          StructField('hours',
                                      StringType(), True),
                          StructField('company_id',
                                      StringType(), True),
                          StructField('group_id',
                                      StringType(), True),
                          StructField('campaign_id',
                                      StringType(), True),
                          StructField('publisher_id',
                                      StringType(), True),
                          StructField('job_id',
                                      StringType(), True),
                          StructField('bid_set',
                                      StringType(), True),
                          StructField('clicks',
                                      StringType(), True),
                          StructField('spend_hour',
                                      StringType(), True),
                          StructField('feed_id',
                                      StringType(), True),
                          StructField('sources',
                                      StringType(), True)])

In [18]:
haha = result.withColumn('c1', F.from_json('value', schema = columns)).select('c1.*')

In [19]:
haha.show()

+---+--------------------+--------------------+---------+----------+-----+----------+--------+-----------+------------+------+-------+------+----------+-------+---------+
| id|        created_date|  last_modified_date|is_active|     dates|hours|company_id|group_id|campaign_id|publisher_id|job_id|bid_set|clicks|spend_hour|feed_id|  sources|
+---+--------------------+--------------------+---------+----------+-----+----------+--------+-----------+------------+------+-------+------+----------+-------+---------+
|  1|2022-10-21 09:24:...|2022-10-21 09:24:...|        1|2022-10-21|    6|       349|     160|        446|         121| 11039| 0.0000|     1|    0.0000|    108|Cassandra|
|  2|2022-10-21 09:24:...|2022-10-21 09:24:...|        1|2022-10-21|    6|       349|     160|        446|           0| 11034| 0.5000|     1|    0.5000|      0|Cassandra|
|  3|2022-10-26 02:55:...|2022-10-26 02:55:...|        1|2022-10-26|    2|         3|    null|        441|           0| 10991| 0.5000|     1|    