In [None]:
import pickle

In [None]:
d = {'ID':[1,2,3],'Dep':['IT','HR','FI'],'Sal':[12000,78906,62427]}

In [None]:
d1=pickle.dumps(d)

In [None]:
d1

pickle.loads(d1)

In [None]:
!whereis spark

In [None]:
import findspark

In [None]:
findspark.init('/usr/local/spark')

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MemoryManagementDemo") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

In [None]:
df = spark.range(0, 10000000)  # Large DataFrame
df = df.withColumn("square", df["id"] * df["id"])

In [None]:
df.take(4)

In [None]:
df.cache()  # Stored in storage memory/off-heap
df.count()  # Trigger caching (materialize)

In [None]:
spark

In [None]:
from pyspark.sql.functions import col

shuffled = df.repartition(200).groupBy(col("id") % 10).count()
shuffled.explain(True)
shuffled.show()

In [None]:
df2 = spark.range(0, 10000000)
joined = df.join(df2, "id")
joined.count()

In [None]:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "512m")

In [None]:
#UDF

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Create a sample DataFrame
df = spark.range(0, 100).withColumnRenamed("id", "value")

# Define a UDF that labels even/odd
def label_even_odd(n):
    return "even" if n % 2 == 0 else "odd"

label_udf = udf(label_even_odd, StringType())

# Apply UDF
df_with_udf = df.withColumn("label", label_udf(df["value"]))
#df_with_udf.show()
df_with_udf.collect()

In [None]:
from pyspark.sql.functions import when

df_fast = df.withColumn(
    "label", when(df["value"] % 2 == 0, "even").otherwise("odd")
)
df_fast.collect()

In [None]:
# buckets
df.write.bucketBy(4, "value").sortBy("value").saveAsTable("bucketed_table")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, when, col, split

spark = SparkSession.builder.appName("TransformDemo").getOrCreate()

# Sample Data
data = [
    ("John", "A,B,C", 85),
    ("Sara", "A", 92),
    ("Mike", "B,C", 70)
]
df = spark.createDataFrame(data, ["name", "subjects", "score"])

# 1️⃣ Using explode after splitting comma-separated values
df1 = df.withColumn("subject", explode(split(col("subjects"), ",")))

# 2️⃣ Using 'when' to add a status column
df2 = df1.withColumn("status", when(col("score") >= 80, "Passed").otherwise("Retest"))

df2.show(truncate=False)
df2.explain("formatted")

In [None]:
df_cached = df2.cache()
df_cached.count()

In [None]:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

In [None]:
# Salting Technique
from pyspark.sql.functions import rand, concat_ws
df1 = df1.withColumn("salt", (rand() * 10).cast("int"))
df2 = df2.withColumn("salt", (rand() * 10).cast("int"))
df_joined = df1.join(df2, ["key", "salt"])

In [None]:
spark

In [None]:
churn_df = spark.read.option("header", "true").csv('churn.csv')

In [None]:
churn_df.write.parquet("churn1.parquet")

In [None]:
# Parquet: Save and query with predicate pushdown

parquet_df = spark.read.parquet("churn1.parquet")
parquet_df.filter(col("Age") > 30)
parquet_df.explain()

In [None]:
print(spark.conf.get("spark.sql.parquet.filterPushdown"))

In [None]:
parquet_df.take(2)

In [None]:
from pyspark.sql.functions import explode, split

# Read streaming text files
lines = spark.readStream.format("text") \
    .option("path", "/mnt/input/streaming/") \
    .option("maxFilesPerTrigger", 1) \
    .load()

# Split into words
words = lines.select(explode(split(lines.value, " ")).alias("word"))

# Word count
wordCounts = words.groupBy("word").count()

# Output to console
query = wordCounts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "/mnt/checkpoints/wordcount/") \
    .start()

In [None]:
from pyspark.sql.functions import window

# Assume data has columns: value, timestamp

aggregated = df.withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "5 minutes"), "value") \
    .count()


In [None]:
# 1. Saving DataFrame as Parquet
df.write.mode("overwrite").parquet("/tmp/users_parquet")

# 2. Saving as Delta
df.write.format("delta").mode("overwrite").save("/tmp/users_delta")


In [None]:
# 3. Delta-specific operations
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/tmp/users_delta")

# Time Travel: View previous version
spark.read.format("delta").option("versionAsOf", 0).load("/tmp/users_delta").show

'''spark.read.format("delta") \
    .option("timestampAsOf", "2024-06-01 10:00:00") \
    .load("/path/to/table")
'''

# Merge example (Upsert)
updates_df = ...
delta_table.alias("t").merge(
    updates_df.alias("s"),
    "t.id = s.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [None]:
# Check physical plan and predicate pushdown
df.filter("age > 30").explain("formatted")

# Inspect underlying files
df.inputFiles()

In [None]:
import dlt
from pyspark.sql.functions import col

@dlt.table
def bronze_orders():
    return spark.read.format("json").load("/mnt/raw/orders")

@dlt.table
def silver_orders():
    return dlt.read("bronze_orders").filter(col("status") == "completed")

@dlt.table
def gold_order_totals():
    return dlt.read("silver_orders") \
              .groupBy("customer_id") \
              .sum("amount")