In [5]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, when
from pyspark.sql.types import StructType, StringType, IntegerType

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic = 'streamTopic'
group_id = 'group_id'

conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id,
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)
consumer.subscribe([topic])

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumerToCassandra") \
    .config("spark.cassandra.connection.host", "172.17.0.2") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1, com.datastax.spark:spark-cassandra-connector_2.12:3.0.3") \
    .getOrCreate()

# Define schema for the incoming Kafka message
schema = StructType() \
    .add("userid", StringType()) \
    .add("user_location", StringType()) \
    .add("channelid", StringType()) \
    .add("genre", StringType()) \
    .add("lastactive", StringType()) \
    .add("title", StringType()) \
    .add("watchfrequency", IntegerType()) \
    .add("etags", StringType())

# Read Kafka messages in Spark Streaming
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Convert the value column from Kafka into string and parse JSON
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

# Perform necessary transformations on the parsed data 
netflix_df = parsed_df.withColumn('impression',
    when(parsed_df['watchfrequency'] < 3, "neutral")
    .when(((parsed_df['watchfrequency'] >= 3) & (parsed_df['watchfrequency'] <= 10)), "like")
    .otherwise("favorite")
)

# Drop the etags values
netflix_transformed_df = netflix_df.drop('etags')

# Write data to Cassandra
query = netflix_transformed_df.writeStream \
    .outputMode("append") \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "netflixdata") \
    .option("table", "netflix_data") \
    .option("checkpointLocation", "path_to_checkpoint_location") \
    .start()

# Await termination
query.awaitTermination()


RuntimeError: Java gateway process exited before sending its port number

In [None]:
# AWS credentials and Redshift configurations
aws_access_key = 'YOUR_AWS_ACCESS_KEY'
aws_secret_key = 'YOUR_AWS_SECRET_KEY'
redshift_url = 'jdbc:redshift://your-redshift-endpoint:5439/your-database'
redshift_table = 'your_redshift_table'

df.write \
    .format("redshift") \
    .option("url", redshift_url) \
    .option("dbtable", redshift_table) \
    .option("tempdir", "s3://your-s3-bucket/temp-dir") \
    .option("aws_iam_role", "arn:aws:iam::YOUR_AWS_ACCOUNT_ID:role/YourRedshiftRole") \
    .option("aws_access_key_id", aws_access_key) \
    .option("aws_secret_access_key", aws_secret_key) \
    .mode("overwrite") \
    .save()

In [34]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, when
from pyspark.sql.types import StructType, StringType, IntegerType

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic = 'streamTopic'
group_id = 'group_id'

conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id,
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)
consumer.subscribe([topic])
access_key = "AKIAZSSFLKRRD72NTUXF"
secret_access_key = "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"

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumerToCassandra") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1, org.apache.hadoop:hadoop-aws:3.3.4, com.amazonaws:aws-java-sdk-bundle:1.12.262  ") \
    .config("spark.driver.extraClassPath", "C:/BigDataTools/spark-3.5.0/jars/redshift-jdbc42-2.1.0.9.jar") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "https://bibhusha-demo-bucket.s3.us-west-2.amazonaws.com/streamData/") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_access_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "True") \
    .getOrCreate()

# Define schema for the incoming Kafka message
schema = StructType() \
    .add("userid", StringType()) \
    .add("user_location", StringType()) \
    .add("channelid", IntegerType()) \
    .add("genre", StringType()) \
    .add("lastactive", StringType()) \
    .add("title", StringType()) \
    .add("watchfrequency", IntegerType()) \
    .add("etags", StringType())

# Read Kafka messages in Spark Streaming
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Convert the value column from Kafka into string and parse JSON
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

# Perform necessary transformations on the parsed data 
netflix_df = parsed_df.withColumn('impression',
    when(parsed_df['watchfrequency'] < 3, "neutral")
    .when(((parsed_df['watchfrequency'] >= 3) & (parsed_df['watchfrequency'] <= 10)), "like")
    .otherwise("favorite")
)

# Drop the etags values
netflix_transformed_df = netflix_df.drop('etags')

username = 'awsuser' #redshift db [username]
password = 'Bibhusha123' #redshift db [password]
url = "jdbc:redshift://bibhusha-redshift-cluster-1.c7kundbcoadd.eu-north-1.redshift.amazonaws.com:5439/dev?user=" + username + "&password=" + password


query = netflix_transformed_df.writeStream \
    .outputMode("append") \
    .foreachBatch(lambda batch_df, batch_id: batch_df.write \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", "test") \
        .option("tempdir", "s3://bibhusha-demo-bucket/checkpoint/") \
        .option("aws_access_key_id", access_key) \
        .option("aws_secret_access_key", secret_access_key) \
        .mode("append") \
        .save()) \
    .start()


# Await termination
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = dae22a7b-7321-4f24-9ca6-e072b801f25b, runId = 89d37ba3-4234-4268-a64e-3109f0f740b6] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\py4j\clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\sql\utils.py", line 115, in call
    raise e
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\sql\utils.py", line 112, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "C:\Users\bibhusha.ojha_genese\AppData\Local\Temp\ipykernel_41804\1410475583.py", line 83, in <lambda>
    .save()) \
     ^^^^^^
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\sql\readwriter.py", line 1396, in save
    self._jwrite.save()
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\errors\exceptions\captured.py", line 169, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "C:\Users\bibhusha.ojha_genese\anaconda3\envs\pyspark_env\Lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o803.save.
: java.sql.SQLException: The connection attempt failed.
	at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
	at com.amazon.redshift.Driver.connect(Driver.java:339)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:160)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:156)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at jdk.proxy3/jdk.proxy3.$Proxy32.call(Unknown Source)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:53)
	at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:53)
	at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)
Caused by: java.net.SocketTimeoutException: Connect timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:546)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:592)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:751)
	at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)
	at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)
	at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)
	at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
	at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:322)
	at com.amazon.redshift.Driver.makeConnection(Driver.java:502)
	at com.amazon.redshift.Driver.connect(Driver.java:315)
	... 78 more



In [None]:
# # Define a query to aggregate and write batch data to Redshift
# query = netflix_transformed_df.writeStream \
#     .outputMode("append") \
#     .foreachBatch(lambda batch_df, batch_id: batch_df.write \
#         .format("jdbc") \
#         .option("url", url) \
#         .option("dbtable", "your_redshift_table") \
#         .option("driver", "com.amazon.redshift.jdbc.Driver") \
#         .mode("append") \
#         .save()) \
#     .start()

# # Await termination
# query.awaitTermination()
# df.write \
#   .format("io.github.spark_redshift_community.spark.redshift") \
#   .option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
#   .option("dbtable", "my_table_copy") \
#   .option("tempdir", "s3n://path/for/temp/data") \
#   .mode("error") \
#   .save()

# # Write the transformed data to S3
# output_path = "s3a://your-s3-bucket/path/to/output"  # Replace with your S3 bucket and path
# query = netflix_transformed_df.writeStream \
#     .format("parquet") \
#     .outputMode("append") \
#     .option("path", output_path) \
#     .option("checkpointLocation", "path/to/checkpoint")  # Provide a checkpoint location
#     .start()

query = json_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://your-s3-bucket/checkpoint-dir/") \
    .start("s3a://your-s3-bucket/path/to/save/")

.appName("SparkMinioExample") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://myminio:9000/") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_access_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "True") \

In [40]:
from confluent_kafka import Consumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, when
from pyspark.sql.types import StructType, StringType, IntegerType

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic = 'streamTopic'
group_id = 'group_id'

conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': group_id,
    'auto.offset.reset': 'earliest'
}

# Create Kafka consumer
consumer = Consumer(conf)
consumer.subscribe([topic])
access_key = "AKIAZSSFLKRRD72NTUXF"
secret_access_key = "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"

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumerToS3") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_access_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "True") \
    .getOrCreate()

# Define schema for the incoming Kafka message
schema = StructType() \
    .add("userid", StringType()) \
    .add("user_location", StringType()) \
    .add("channelid", IntegerType()) \
    .add("genre", StringType()) \
    .add("lastactive", StringType()) \
    .add("title", StringType()) \
    .add("watchfrequency", IntegerType()) \
    .add("etags", StringType())

# Read Kafka messages in Spark Streaming
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Convert the value column from Kafka into string and parse JSON
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")) \
    .select("data.*")

# Perform necessary transformations on the parsed data 
netflix_df = parsed_df.withColumn('impression',
    when(parsed_df['watchfrequency'] < 3, "neutral")
    .when(((parsed_df['watchfrequency'] >= 3) & (parsed_df['watchfrequency'] <= 10)), "like")
    .otherwise("favorite")
)

# Drop the etags values
netflix_transformed_df = netflix_df.drop('etags')

# username = 'awsuser' #redshift db [username]
# password = 'Bibhusha123' #redshift db [password]
# url = "jdbc:redshift://bibhusha-redshift-cluster-1.c7kundbcoadd.eu-north-1.redshift.amazonaws.com:5439/dev?user=" + username + "&password=" + password

output_path = "s3a://bibhusha-demo-bucket/streamData/"
query = netflix_transformed_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation", "s3a://bibhusha-demo-bucket/checkpoint/") \
    .start(output_path)

# Await termination
query.awaitTermination()


Py4JJavaError: An error occurred while calling o1052.start.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:135)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:322)
	at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:442)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:407)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:233)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 24 more


In [None]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaConsumerToS3") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.endpoint", "https://s3.us-west-2.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_access_key) \
    .config("spark.hadoop.fs.s3a.path.style.access", "True") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.impl.disable.cache", "true") \
    .getOrCreate()
