In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("PySparkExample") \
    .getOrCreate()


In [None]:
# Create an accumulator
acc = spark.sparkContext.accumulator(0)

# Function to add to accumulator
def add_to_accumulator(x):
    acc.add(x)

# Use accumulator in an RDD operation
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
rdd.foreach(add_to_accumulator)

print(f"Accumulator value: {acc.value}")


Accumulator value: 15


In [None]:
# Sample DataFrame
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "ID"])

# Repartition
df_repartitioned = df.repartition(4)  # Increase partitions to 4

# Coalesce
df_coalesced = df.coalesce(1)  # Reduce partitions to 1
print(df_coalesced)



DataFrame[Name: string, ID: bigint]


In [None]:
# Broadcasting a variable
broadcast_var = spark.sparkContext.broadcast([1, 2, 3])

# Access the broadcasted data
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
result = rdd.map(lambda x: x * broadcast_var.value[0]).collect()
print(result)


[1, 2, 3, 4]


In [None]:
# Parallelize a list to create an RDD
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
print(rdd.collect())


[1, 2, 3, 4, 5]


In [None]:
# Basic RDD operations
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
print(rdd.map(lambda x: x * 2).collect())


[2, 4, 6, 8]


In [None]:
data = [("Alice", 34), ("Bob", 45)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([StructField("Name", StringType(), True),
                     StructField("Age", IntegerType(), True)])
empty_df = spark.createDataFrame([], schema)
empty_df.show()


+----+---+
|Name|Age|
+----+---+
+----+---+



In [None]:
rdd = spark.sparkContext.parallelize([("Alice", 34), ("Bob", 45)])
df = rdd.toDF(["Name", "Age"])
df.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



In [None]:
pandas_df = df.toPandas()
print(pandas_df)


    Name  Age
0  Alice   34
1    Bob   45


In [None]:
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True)
])
df = spark.createDataFrame(data, schema)
df.show()



+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



In [None]:
from pyspark.sql import Row

# Row example with RDD
rdd = spark.sparkContext.parallelize([Row(name="Alice", age=34), Row(name="Bob", age=45)])
df = rdd.toDF()
df.show()


+-----+---+
| name|age|
+-----+---+
|Alice| 34|
|  Bob| 45|
+-----+---+



In [None]:
data2 = [("Alice", "New York"), ("Bob", "California")]
df2 = spark.createDataFrame(data2, ["Name", "City"])

df_joined = df.join(df2, "Name", "inner")
df_joined.show()


+-----+---+----------+
| name|age|      City|
+-----+---+----------+
|Alice| 34|  New York|
|  Bob| 45|California|
+-----+---+----------+



In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def add_ten(x):
    return x + 10

add_ten_udf = udf(add_ten, IntegerType())
df = df.withColumn("Age_plus_10", add_ten_udf(df["Age"]))
df.show()


+-----+---+-----------+
| name|age|Age_plus_10|
+-----+---+-----------+
|Alice| 34|         44|
|  Bob| 45|         55|
+-----+---+-----------+



In [None]:
rdd = spark.sparkContext.parallelize([1, 2, 3])
flat_mapped_rdd = rdd.flatMap(lambda x: [x, x * 2])
print(flat_mapped_rdd.collect())


[1, 2, 2, 4, 3, 6]


In [None]:
df_csv = spark.read.csv(r"C:\Users\hp\Downloads\export.csv", header=True, inferSchema=True)
df_csv.show()


IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: C:%5CUsers%5Chp%5CDownloads%5Cexport.csv