# Setting Up Volume And Loading Data

In [0]:
# spark.sql("CREATE VOLUME IF NOT EXISTS workspace.default.my_vol")
from pyspark.sql.functions import col


df = spark.read.format("csv").option("inferScehama","true").option("header","true").load("/Volumes/workspace/default/my_vol")
df1 = spark.table("my_table")

table = df.createOrReplaceTempView("my_table")
# df.select("movie_title","movie_averageRating","movie_numerOfVotes","production_date","runtime_minutes").write.format("delta").saveAsTable("my_table")
df.select("movie_title","production_date","genres").createOrReplaceTempView("v1")
df.select("movie_title","director_name","director_professions").createOrReplaceTempView("v2")
v1 = spark.table("v1")
v2 = spark.table("v2")


# DML command function setup

In [0]:
select_cols = lambda *args,table: f"SELECT {', '.join(args)} FROM {table}"


# Setting Transformation functions for DDL manipulations

In [0]:
add_cols = lambda table,*args: f"ALTER TABLE {table} ADD COLUMNS ({', '.join(args)})"
del_cols = lambda table,*args: f"ALTER TABLE {table} DROP COLUMNS ({','.join(args)})"
rename_col = lambda old_name,new_name,table: f"ALTER TABLE {table} RENAME COLUMN {old_name} TO {new_name}"
modify_col = lambda col_name,col_metadata,table: f"ALTER TABLE {table} MODIFY {col_name} {col_metadata}"

## Setting up DDL transformations and doing actions

In [0]:

# spark.sql(add_cols("workspace.default.my_table","new_col1 int","new_col2 string"))
# spark.sql(del_cols("workspace.default.my_table","movie_averageRating","movie_numerOfVotes"))
# spark.sql(rename_col("new_col1","new_col3","workspace.default.my_table"))
# spark.sql(modify_col("new_col3","int","workspace.default.my_table"))
# spark.sql("SELECT * FROM workspace.default.my_table").printSchema()

In [0]:
# df1 = df1.drop("movie_averageRating","movie_numerOfVotes")
df1 = df1.withColumnRenamed("new_col1","new_col3")
df1 = df1.withColumn("new_col3",col("new_col3").cast("int"))
df1.write.format("delta").saveAsTable("my_table")
df1.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable("my_table")

In [0]:
display(df1.select("*"))

In [0]:
# 1. Inner Join on movie_title
inner_join = v1.join(v2, on="movie_title", how="inner")
print("Inner Join Result:")
inner_join.show()

# 2. Left Join
left_join = v1.join(v2, on="movie_title", how="left")
print("Left Join Result:")
left_join.show()

# 3. Right Join
right_join = v1.join(v2, on="movie_title", how="right")
print("Right Join Result:")
right_join.show()

# 4. Full Outer Join
full_outer_join = v1.join(v2, on="movie_title", how="outer")
print("Full Outer Join Result:")
full_outer_join.show()