In [1]:
from pyspark.sql import SparkSession
from config import configurations
spark = (
    SparkSession.builder.appName('ReadFromS3')\
    .config('spark.master', 'local')\
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
        "org.apache.hadoop:hadoop-aws:3.3.1,"
        "com.amazonaws:aws-java-sdk:1.11.469")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .config("spark.hadoop.fs.s3a.access.key", configurations.get("AWS_ACCESS_KEY"))\
    .config("spark.hadoop.fs.s3a.secret.key", configurations.get("AWS_SECRET_KEY"))\
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
    .getOrCreate()
)

24/11/30 14:37:35 WARN Utils: Your hostname, codebase resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/30 14:37:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/de-ninja/.ivy2/cache
The jars for the packages stored in: /home/de-ninja/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-94a4bf65-be1d-4aff-b578-c4ab3895f58a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apach

In [3]:
spark.read.format('csv').option('header','true').load('s3a://simple-s3-for-demo/Customer_Updated.csv').show()

                                                                                

+-----------+-------------+----------+--------+
|customer_id|customer_name| join_date|location|
+-----------+-------------+----------+--------+
|        105|          Eva|2022-01-01|    Ohio|
|        106|        Frank|2022-02-01|  Nevada|
|        107|        Grace|2022-03-01|Colorado|
|        108|        Henry|2022-04-01|    Utah|
+-----------+-------------+----------+--------+



In [38]:
from pyspark.sql.types import *
data = [
    (1, "Alice", 25, "Engineer", 70000.50),
    (2, "Bob", 30, "Data Scientist", 95000.75),
    (3, "Charlie", 35, "Manager", 120000.00),
    (4, "David", 28, "Analyst", 60000.00),
    (5, "Eve", 32, "Developer", 85000.25)
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("designation", StringType(), True),
    StructField("salary", FloatType(), True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show()

+---+-------+---+--------------+--------+
| id|   name|age|   designation|  salary|
+---+-------+---+--------------+--------+
|  1|  Alice| 25|      Engineer| 70000.5|
|  2|    Bob| 30|Data Scientist|95000.75|
|  3|Charlie| 35|       Manager|120000.0|
|  4|  David| 28|       Analyst| 60000.0|
|  5|    Eve| 32|     Developer|85000.25|
+---+-------+---+--------------+--------+



In [36]:
r_df = df.repartition(3)
# r_df.rdd.collect()[0].name
r_df.rdd.collect()

[Row(id=5, name='Eve', age=32, designation='Developer', salary=85000.25),
 Row(id=4, name='David', age=28, designation='Analyst', salary=60000.0),
 Row(id=2, name='Bob', age=30, designation='Data Scientist', salary=95000.75),
 Row(id=3, name='Charlie', age=35, designation='Manager', salary=120000.0),
 Row(id=1, name='Alice', age=25, designation='Engineer', salary=70000.5)]

In [25]:
l = [1,2,3,4,5,6,8,3,34,44,5,65,6,7,78,7]
rdd1 = spark.sparkContext.parallelize(l)

In [31]:
rdd1.collect()

[1, 2, 3, 4, 5, 6, 8, 3, 34, 44, 5, 65, 6, 7, 78, 7]

In [22]:
df.write.mode('overwrite').format('parquet').partitionBy('designation')\
.save('s3a://simple-s3-for-demo/employeeDtaa/')

                                                                                

In [23]:
spark.read.format('parquet').load('s3a://simple-s3-for-demo/employeeDtaa/').show()

[Stage 49:>                                                         (0 + 1) / 1]

+---+-------+---+--------+--------------+
| id|   name|age|  salary|   designation|
+---+-------+---+--------+--------------+
|  3|Charlie| 35|120000.0|       Manager|
|  1|  Alice| 25| 70000.5|      Engineer|
|  4|  David| 28| 60000.0|       Analyst|
|  2|    Bob| 30|95000.75|Data Scientist|
|  5|    Eve| 32|85000.25|     Developer|
+---+-------+---+--------+--------------+



                                                                                

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark.sparkContext.setLogLevel('ERROR')
# Kafka configuration
kafka_broker = "localhost:9092"  # Kafka broker
topic_name = "topic_1"        # Kafka topic name

# Read stream from Kafka
kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("startingOffsets", 'latest') \
    .option("subscribe", topic_name) \
    .load()

# Select the key and value as strings
schema = StructType([
    StructField('name', StringType(), False),
    StructField('age', IntegerType(), False)
])
kafka_df = kafka_stream.selectExpr("CAST(value AS STRING) AS data")\
.select(F.from_json(F.col('data'), schema = schema).alias('formatted_data'))\
.select("formatted_data.name", "formatted_data.age")

# Print the schema (optional)
# kafka_df.printSchema()
# kafka_df_c = kafka_df.groupby('name').count()

# Process and display the stream (for debugging)
query = kafka_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|name|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+---+
|  name|age|
+------+---+
|Bharat| 27|
+------+---+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  name|count|
+------+-----+
|Bharat|    1|
+------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+------+---+
|  name|age|
+------+---+
|Bharat| 27|
|Bharat| 27|
+------+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+---+
|  name|age|
+------+---+
|Bharat| 27|
|Bharat| 27|
|Bharat| 27|
+------+---+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  name|count|
+------+-----+
|Bharat|    6|
+------+-----+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/de-ninja/airflow-venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/de-ninja/airflow-venv/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
{"name" : "Bharat", "age" : 27}

In [None]:
# Step 1 : kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties
# Step 2 : kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
# Step 3 : kafka-topics.sh --list --bootstrap-server localhost:9092 --> TO CHECK TOPICS
# Step 4 : kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 'topic_1' --> TO CREATE TOPICS
# Step 5 : kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic_1 --> TO SETUP PRODUCER TO PRODUCE "topic_1"
# Step 6 : kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --> TO SETUP A CONSUMER FOR 'topic_1'