1. Working with RDDs:
   a) Write a Python program to create an RDD from a local data source.
   b) Implement transformations and actions on the RDD to perform data processing tasks.
   c) Analyze and manipulate data using RDD operations such as map, filter, reduce, or aggregate.


In [None]:
from pyspark import SparkContext

sc = SparkContext(appName="RDDExample")

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

rdd = sc.parallelize(data)

doubled_rdd = rdd.map(lambda x: x * 2)

even_rdd = rdd.filter(lambda x: x % 2 == 0)

sum_result = rdd.reduce(lambda x, y: x + y)

count = rdd.count()

print("Original RDD:", rdd.collect())
print("Doubled RDD:", doubled_rdd.collect())
print("Even Numbers RDD:", even_rdd.collect())
print("Sum of RDD:", sum_result)
print("Count of RDD:", count)


2. Spark DataFrame Operations:
   a) Write a Python program to load a CSV file into a Spark DataFrame.
   b) Perform common DataFrame operations such as filtering, grouping, or joining.
   c) Apply Spark SQL queries on the DataFrame to extract insights from the data.


In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

df = spark.read.csv("file.csv", header=True, inferSchema=True)

df.show()

filtered_df = df.filter(df["age"] > 30)

grouped_df = df.groupBy("city").count()

df2 = spark.read.csv("other_file.csv", header=True, inferSchema=True)
joined_df = df.join(df2, on="id", how="inner")


df.createOrReplaceTempView("sql_table")
query_result = spark.sql("select * from sql_table where age > 30")

print("Original DataFrame:")
df.show()
print("Filtered DataFrame:")
filtered_df.show()
print("Grouped DataFrame:")
grouped_df.show()
print("Joined DataFrame:")
joined_df.show()
print("Query Result:")
query_result.show()



3. Spark Streaming:
  a) Write a Python program to create a Spark Streaming application.
   b) Configure the application to consume data from a streaming source (e.g., Kafka or a socket).
   c) Implement streaming transformations and actions to process and analyze the incoming data stream.


In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

spark = SparkSession.builder.appName("KafkaStreamingExample").getOrCreate()

# Create a StreamingContext with a batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 1)

# Configure the application to consume data from a Kafka streaming source
kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
    "group.id": "spark-streaming-consumer"
}

topics = ["my-topic"]

stream = KafkaUtils.createDirectStream(ssc, topics, kafka_params)

# Implement streaming transformations and actions
lines = stream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Print the word counts
word_counts.pprint()

# Start the streaming context
ssc.start()

# Wait for the streaming to finish
ssc.awaitTermination()


4. Spark SQL and Data Source Integration:
   a) Write a Python program to connect Spark with a relational database (e.g., MySQL, PostgreSQL).
   b)Perform SQL operations on the data stored in the database using Spark SQL.
   c) Explore the integration capabilities of Spark with other data sources, such as Hadoop Distributed File System (HDFS) or Amazon S3.


In [None]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Connect Spark with a MySQL database
jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"
connection_properties = {
    "user": "username",
    "password": "password",
    "driver": "com.mysql.jdbc.Driver"
}

# Read data from MySQL database into a DataFrame
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "mytable") \
    .option("user", connection_properties["user"]) \
    .option("password", connection_properties["password"]) \
    .option("driver", connection_properties["driver"]) \
    .load()

# Perform SQL operations on the data stored in the database using Spark SQL
df.createOrReplaceTempView("mytable")
result = spark.sql("select * from mytable where age > 30")

# Display the result
result.show()

# Explore integration capabilities of Spark with other data sources
# Read data from HDFS
hdfs_path = "hdfs://localhost:9000/path/to/file"
df_hdfs = spark.read.csv(hdfs_path, header=True, inferSchema=True)

# Read data from Amazon S3
s3_path = "s3a://bucket-name/path/to/file"
df_s3 = spark.read.parquet(s3_path)

# Perform operations on the data from different sources
# ...

# Stop the SparkSession
spark.stop()
