''' JOINS '''

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("joins").getOrCreate()

In [2]:
employee=spark.createDataFrame([(1, "John", "Engineering"), (2, "Mike", "HR"), (3, "Sara", "Finance")], ["emp_id", "name", "department"])
addresses = spark.createDataFrame([(1, "NY"), (2, "LA"), (4, "DC")], ["emp_id", "address"])

In [5]:
result=employee.join(addresses,"emp_id","inner")
print("Inner Join")
result.show()

Inner Join
+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
+------+----+-----------+-------+



In [None]:
result = employee.join(addresses, "emp_id", "outer")
print("Outer Join")
result.show()

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
|     3|Sara|    Finance|   NULL|
|     4|NULL|       NULL|     DC|
+------+----+-----------+-------+



In [None]:
result = employee.join(addresses, "emp_id", "left_outer")
print("Left Join")
result.show()

Inner Join
+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
|     3|Sara|    Finance|   NULL|
+------+----+-----------+-------+



In [None]:
result = employee.join(addresses, "emp_id", "right_outer")
print("Right Join")
result.show()

Inner Join
+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
|     4|NULL|       NULL|     DC|
+------+----+-----------+-------+



In [8]:
result = employee.join(addresses, "emp_id", "left_semi")
print("Left Semi Join")
result.show()

Left Semi Join
+------+----+-----------+
|emp_id|name| department|
+------+----+-----------+
|     1|John|Engineering|
|     2|Mike|         HR|
+------+----+-----------+



In [9]:
result = employee.join(addresses, "emp_id", "left_anti")
print("Left Anti Join")
result.show()

Left Anti Join
+------+----+----------+
|emp_id|name|department|
+------+----+----------+
|     3|Sara|   Finance|
+------+----+----------+



In [None]:
#Joining on Multiple Columns
df1 = spark.createDataFrame([(1, "John", "Doe"), (2, "Mike", "Smith")], ["id", "first_name", "last_name"])
df2 = spark.createDataFrame([(1, "John", "Doe"), (2, "Mike", "Johnson")], ["id", "first_name", "last_name"])

result = df1.join(df2, ["id", "first_name"])
result.show()

In [10]:
# Broadcast Joins
from pyspark.sql.functions import broadcast

result = employee.join(broadcast(addresses), "emp_id")
result.show()

+------+----+-----------+-------+
|emp_id|name| department|address|
+------+----+-----------+-------+
|     1|John|Engineering|     NY|
|     2|Mike|         HR|     LA|
+------+----+-----------+-------+



Built-in Functions with Examples

In [11]:
from pyspark.sql.functions import initcap

df = spark.createDataFrame([("john doe",)], ["name"])
df.select(initcap("name")).show()

+-------------+
|initcap(name)|
+-------------+
|     John Doe|
+-------------+



In [12]:
from pyspark.sql.functions import concat

df = spark.createDataFrame([("Hello", "World")], ["col1", "col2"])
df.select(concat("col1", "col2")).show()

+------------------+
|concat(col1, col2)|
+------------------+
|        HelloWorld|
+------------------+



In [13]:
from pyspark.sql.functions import round

df = spark.createDataFrame([(2.4567,)], ["value"])
df.select(round("value", 2)).show()

+---------------+
|round(value, 2)|
+---------------+
|           2.46|
+---------------+



In [14]:
from pyspark.sql.functions import ceil

df = spark.createDataFrame([(2.4567,)], ["value"])
df.select(ceil("value")).show()

+-----------+
|CEIL(value)|
+-----------+
|          3|
+-----------+



In [15]:
from pyspark.sql.functions import current_date

df.select(current_date().alias("current_date")).show()
# Output: e.g., "2023-08-25"

+------------+
|current_date|
+------------+
|  2025-05-31|
+------------+



In [16]:
from pyspark.sql.functions import when


df = spark.createDataFrame([(3,),(7,)], ["value"])
df.select(when(df["value"] < 5, "low").otherwise("high").alias("result")).show()

+------+
|result|
+------+
|   low|
|  high|
+------+



Aggregation ans Miscellaneous Functions

In [17]:
from pyspark.sql.functions import avg

df = spark.createDataFrame([(1,),(2,),(3,)], ["value"])
df.agg(avg("value")).show()
# Output: 2.0

+----------+
|avg(value)|
+----------+
|       2.0|
+----------+



In [18]:
from pyspark.sql.functions import sum

df = spark.createDataFrame([(1,),(2,),(3,)], ["value"])
df.agg(sum("value")).show()
# Output: 6

+----------+
|sum(value)|
+----------+
|         6|
+----------+



In [19]:
from pyspark.sql.functions import coalesce

df = spark.createDataFrame([(None, "hello"), ("world", None)], ["col1", "col2"])
df.select(coalesce(df["col1"], df["col2"])).show()

+--------------------+
|coalesce(col1, col2)|
+--------------------+
|               hello|
|               world|
+--------------------+



In [20]:
from pyspark.sql.functions import isnull, isnotnull

df = spark.createDataFrame([(None, "hello"), ("world", None)], ["col1", "col2"])
df.select(isnull(df["col1"]), isnotnull(df["col2"])).show()

+--------------+------------------+
|(col1 IS NULL)|(col2 IS NOT NULL)|
+--------------+------------------+
|          true|              true|
|         false|             false|
+--------------+------------------+



In [22]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def double_number(x):
    return x * 2

double_udf = udf(double_number, IntegerType())

df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
df.withColumn("doubled_value", double_udf(df["value"])).show()

+-----+-------------+
|value|doubled_value|
+-----+-------------+
|    1|            2|
|    2|            4|
|    3|            6|
+-----+-------------+



In [23]:
spark.udf.register("double_sql_udf", double_number, IntegerType())

df.createOrReplaceTempView("temp_table")
spark.sql("SELECT value, double_sql_udf(value) as doubled_value FROM temp_table").show()

+-----+-------------+
|value|doubled_value|
+-----+-------------+
|    1|            2|
|    2|            4|
|    3|            6|
+-----+-------------+



Caching and Partition

In [None]:
rdd = spark.sparkContext.parallelize(range(1, 100))
# Cache the RDD
rdd.cache()


PythonRDD[172] at RDD at PythonRDD.scala:53

In [25]:
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "A")], ["value", "type"])

# Cache the DataFrame
df.cache()
print(df.is_cached)  

True


In [None]:
#To remove an RDD/DataFrame from cache, use the unpersist()
df.unpersist()

In [None]:
from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK)

In [26]:
data = range(1, 100)
rdd = spark.sparkContext.parallelize(data)

# Check the number of partitions
print(rdd.getNumPartitions())

12


In [27]:
rdd_custom = spark.sparkContext.parallelize(data, 10)  # Create 10 partitions
print(rdd_custom.getNumPartitions())

10


In [28]:
rdd_repartitioned = rdd_custom.repartition(5)  # Reduce to 5 partitions
print(rdd_repartitioned.getNumPartitions())

5


In [30]:
# Partition data by column "type" when writing
df.write.partitionBy("type").parquet("./output/PARTIONED")

Py4JJavaError: An error occurred while calling o459.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)
