In [0]:
%run
 /Workspace/Users/anilkun42@gmail.com/transformation

##Action Functions


In [0]:
from pyspark.sql import DataFrame
from delta.tables import DeltaTable



def create_delta_table(df, table_path, mode="ignore", partition_cols=None):
    writer = df.write.format("delta").mode(mode)
    if partition_cols:
        writer = writer.partitionBy(partition_cols)
    if "." in table_path:
        writer.saveAsTable(table_path)
    else:
        writer.save(table_path)
			
	
	
def optimize_delta_table(table_path, col_names=None):
    spark.sql(f"OPTIMIZE {table_path}") if "." in table_path else \
    spark.sql(f"OPTIMIZE delta.`{table_path}`")
    
    if col_names:
        if isinstance(col_names, str):
            col_names = [col_names]
        spark.sql(f"OPTIMIZE {table_path} ZORDER BY ({','.join(col_names)})") if "." in table_path else \
        spark.sql(f"OPTIMIZE delta.`{table_path}` ZORDER BY ({','.join(col_names)})")
		
		
def vacuum_delta_table(table_path, retention_hours=168):
    spark.sql(f"VACUUM {table_path} RETAIN {retention_hours} HOURS") if "." in table_path else \
    spark.sql(f"VACUUM delta.`{table_path}` RETAIN {retention_hours} HOURS")
	
	
def read_delta_with_timetravel(table_path, version=None, timestamp=None):
    reader = spark.read.format("delta")
    if version:
        reader = reader.option("versionAsOf", version)
    if timestamp:
        reader = reader.option("timestampAsOf", timestamp)
    return reader.load(table_path)
	
	
def delete_from_delta(table_path, condition):
    if "." in table_path:
        spark.sql(f"DELETE FROM {table_path} WHERE {condition}")
    else:
        delta_table = DeltaTable.forPath(spark, table_path)
        delta_table.delete(condition)
		
		
def evolve_delta_schema(table_path, merge_schema=True):
    spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", str(merge_schema).lower())



def get_max_version(table_path) :
    history_df = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}`")
    return history_df.agg({"version": "max"}).collect()[0][0]
    
    

In [0]:
ACTIONS = {
    'create': create_delta_table,
    'optimize': optimize_delta_table,
    'vacuum': vacuum_delta_table,
    'read': read_delta_with_timetravel,
    'delete': delete_from_delta,
    'evolve': evolve_delta_schema,
    'version': get_max_version
}

In [0]:
def apply_actions(df, action_steps):
    for step_num, step in enumerate(action_steps, start=1):
        action = step['action']
        params = step.get('params', {})
        if action == 'create':
            params['df'] = df  
        try:
            ACTIONS[action](**params)
        except Exception as e:
            logging.error(f"Delta operation failed at step {step_num} ({action}): {str(e)}")
            raise Exception(f"Step {step_num} failed: {str(e)}") from e

In [0]:


action_steps = [
    {'action': 'create', 'params': {'table_path': 'peerisland.peer.peer_csutomer'}},
    {'action': 'optimize', 'params': {'table_path': 'peerisland.peer.peer_csutomer'}},
    # {'action': 'vacuum', 'params': {'table_path': 'peerisland.peer.peer_csutomer'}},
    # {'action': 'read', 'params': {'table_path': 'peerisland.peer.peer_csutomer', 'version': 1}},
    # {'action': 'delete', 'params': {'table_path': 'peerisland.peer.peer_csutomer', 'condition': 'status = "inactive"'}},
    # {'action': 'evolve', 'params': {'table_path': 'peerisland.peer.peer_csutomer'}}
]

In [0]:
final_df = final_df.withColumnRenamed("Subscription Date", "Subscription_Date")
apply_actions(final_df, action_steps)

In [0]:
%sql
select * from peerisland.peer.peer_csutomer