### Module Imports
1. `import pyspark.sql.functions as F`: This imports the PySpark SQL functions module and aliases it as `F`. This module contains many built-in functions for data manipulation and 
    transformation in PySpark.

2. `from pyspark.sql.functions import col`, `struct, to_json`: This imports specific functions `col`, `struct`, and `to_json` from the `pyspark.sql.functions` module.

3. `col` is used to refer to a column in a `DataFrame` by its name.
4. `struct` is used to combine multiple columns into a `struct` column, which is a way to group related data together.
5. `to_json` is used to convert a `DataFrame` column or a `struct` column into a `JSON` string.
6. `from pyspark.sql.types import *`: This imports all types from the `pyspark.sql.types module`. 
    This module contains data types used in PySpark DataFrame schemas.

In [0]:
import pyspark.sql.functions as F
from  pyspark.sql.functions import col, struct, to_json, when
from pyspark.sql.types import *

### Reading Data from KAFKA topic `raw_data` into a `Dataframe`
1. `df_read`: This variable stores the `DataFrame` created by reading data from a Kafka topic using PySpark.

2. `.format("kafka")`: Specifies that the data source format is Kafka.

3. `.option("kafka.bootstrap.servers", "pkc-p11xm.us-east-1.aws.confluent.cloud:9092")`: Sets the Kafka bootstrap servers.

4. `.option("subscribe", "raw_data")`: Specifies the topic to subscribe to ("raw_data" in this case).

5. `.option("startingOffsets", "earliest")`: Sets the starting offset for reading messages from the topic to the earliest available offset.

6. `.option("endingOffsets", "latest")`: Sets the ending offset for reading messages from the topic to the latest available offset.

7. `.option("kafka.security.protocol","SASL_SSL")`: Sets the security protocol to `SASL_SSL` for secure communication.

8. `.option("kafka.sasl.mechanism", "PLAIN")`: Specifies the `SASL` mechanism as PLAIN.

9. `.option("kafka.sasl.jaas.config", "...")`: Sets the JAAS configuration for `SASL` authentication with the provided username and password.

10. `.load()`: Loads the data from Kafka into the `DataFrame`.

11. `display(df_read)`: Displays the `DataFrame` in the output.

In [0]:
df_read = spark \
      .read \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "pkc-p11xm.us-east-1.aws.confluent.cloud:9092") \
      .option("subscribe", "raw_data") \
      .option("startingOffsets", "earliest") \
      .option("endingOffsets", "latest")  \
      .option("kafka.security.protocol","SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="F6EYWWXMDPDQSNBE" password="qis/bvd/QNa6WLOQ6oCM5TNnGMsudIg2GulTtW4SM8QAo7t+j+lHdnFeCv0Z0wU3";""") \
      .load()
display(df_read)

### Schema Defintition
1. `from pyspark.sql.types` import `StructType`, `StructField`, `IntegerType`, `StringType`: Imports necessary classes and data types from the `pyspark.sql.types` module.
2. `json_schema` = `StructType([...]`): Defines a JSON schema using `StructType`, which represents a `schema` for structured data, like `JSON` or `CSV`.
3. `StructField("_id", IntegerType(), True)`: Defines a field named `"_id"` with an integer data type `(IntegerType())` that allows `null values` `(True)`.
4. `StructField("name", StringType(), True)`: Defines a field named `"name"` with a `string data type (StringType())` that allows null values `(True)`.
5. Similarly, other fields like `"age", "grade", "attendance", and "marks_outof350"` are defined with their respective data types `(IntegerType() or StringType())` and `nullability`.

In [0]:
json_schema = StructType(
    [   StructField("_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("grade", StringType(), True),
        StructField("attendance", IntegerType(), True),
        StructField("marks_outof350", IntegerType(), True)
    ]
)

### Read the Data from a Kafka topic
In this code snippet, a DataFrame `df_read` is being transformed using PySpark operations to extract specific fields from a JSON column and create a new DataFrame `df` with the selected fields. Here's an explanation of each part:

1. `df = df_read.withColumn('value', F.from_json(F.col('value').cast('string'), json_schema))`: This line adds a new column 'value' to the DataFrame `df_read`. The `from_json()` function is used to parse the JSON string in the 'value' column (after casting it to a string) based on the specified `json_schema`. The result is a DataFrame with an additional 'value' column containing the parsed JSON data.

2. `.select(`: This method is used to select specific columns from the DataFrame.

3. `F.col("value._id").alias("student_id")`: This selects the '_id' field from the 'value' column and renames it as 'student_id'.

4. `F.col("value.name")`: This selects the 'name' field from the 'value' column.

5. `F.col("value.age")`: This selects the 'age' field from the 'value' column.

6. `F.col("value.grade")`: This selects the 'grade' field from the 'value' column.

7. `F.col("value.attendance")`: This selects the 'attendance' field from the 'value' column.

8. `F.col("value.marks_outof350")`: This selects the 'marks_outof350' field from the 'value' column.

9. `)`: Closing bracket for the `select()` method.

10. `display(df)`: Finally, the `display()` function is used to show the DataFrame `df` in a tabular format. This function is typically used in Databricks notebooks or similar environments to visualize the DataFrame content.

In summary, this code snippet processes a DataFrame by parsing a JSON column, selecting specific fields from the parsed JSON data, and displaying the resulting DataFrame `df` with the selected fields.


In [0]:
df = df_read.withColumn('value', F.from_json(F.col('value').cast('string'), json_schema))  \
      .select(
                  F.col("value._id").alias("student_id"),
                  F.col("value.name"),
                  F.col("value.age"),
                  F.col("value.grade"),
                  F.col("value.attendance"),
                  F.col("value.marks_outof350")) 
display(df)

In [0]:
#Saving to DBFS as a Delta Table
delta_path = "dbfs:/FileStore/tables"
df_studentinfo_delta = df.write.format("delta").mode("overwrite").saveAsTable("student_delta")

In [0]:
df_student_delta = spark.table('student_delta')
df_student_delta.show()

### Transformations
1. Drop duplicates in `df_student_delta`.
2. Fill null values in the "Marks_outof350" column:
   - If null, multiply "Attendance" by 12.
   - Otherwise, keep the existing value.
3. Capitalize column names in `df_student_delta`.

In [0]:
df_student_delta = df_student_delta.dropDuplicates()

df_student_delta = df_student_delta.withColumn("marks_outof350", 
                                               when(col("marks_outof350").isNull(), col("attendance") * 12)
                                               .otherwise(col("marks_outof350")))


df_student_delta = df_student_delta.toDF(*(col_name.capitalize() for col_name in df_student_delta.columns))

In [0]:
# Sorting the table in an ascending order
df_student_delta = df_student_delta.orderBy(col("student_id").asc())
df_student_delta.show()

### Wrting Data into a new Kafka topic



  1. Select "Name" column as "key" and convert to JSON.
  2. Write to Kafka topic "transformed_data":
    - Kafka broker: pkc-p11xm.us-east-1.aws.confluent.cloud:9092
    - Security protocol: SASL_SSL
    - SASL mechanism: PLAIN
    - SASL username: {YOUR_SASL_USERNAME}
    - SASL password: {YOUR_SASL_PASSWORD}


In [0]:
df_student_delta.selectExpr("name AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "pkc-p11xm.us-east-1.aws.confluent.cloud:9092") \
    .option("topic", "transformed_data") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="F6EYWWXMDPDQSNBE" password="qis/bvd/QNa6WLOQ6oCM5TNnGMsudIg2GulTtW4SM8QAo7t+j+lHdnFeCv0Z0wU3";""") \
    .save()