In [0]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
from pyspark.sql.functions import *

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

In [0]:
df_sales=spark.read.format('csv').option('header',True).option('inferSchema','True').load('/Volumes/workspace/default/my/sales_data.csv')

In [0]:
df_mapping=spark.read.format('csv').option('inferSchema',True).option('header',True).load('/Volumes/workspace/default/my/customer_salesperson_mapping.csv')

In [0]:
df_sales.display()
df_mapping.display()

In [0]:
%sql
CREATE TABLE IF NOT EXISTS sales_overview (
  customer_id INT,
  customer_name STRING,
  sales INT
)
USING DELTA TBLPROPERTIES ( delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true, delta.enableChangeDataFeed = true)

A Delta table in Databricks is a table format that is built on top of the open-source Delta Lake storage layer, providing features like ACID transactions and time travel to data lakes. Delta tables store data as a directory of files, but their transaction log provides the reliability and performance of a database, making them the default and recommended table type in Databricks. 

**Key features and benefits**
- **ACID transactions:** Ensures data reliability with strong isolation, allowing multiple users to perform operations like merge, update, and delete concurrently.
- **Time travel:** Enables you to query or revert to a previous version of the table, which is useful for audits or rolling back changes.
- **Unified batch and streaming:** Supports both batch and streaming data processing with "exactly-once" semantics, unifying ingestion and queries.
- **Scalable metadata:** Handles large tables with billions of partitions and files.
- **Schema enforcement and evolution:** Prevents bad data from corrupting tables and allows for schema changes over time.
- **Performance optimizations:** Provides features like data pruning and predicate pushdowns to speed up queries. 

In [0]:
df_join=df_sales.join(df_mapping,df_sales['Salesperson']==df_mapping['Salesperson'],'inner')
df_date=df_join.select(col('Date'))

In [0]:
%sql
CREATE TABLE IF NOT EXISTS test (
date string
)
USING DELTA TBLPROPERTIES ( delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true, delta.enableChangeDataFeed = true)

In [0]:
df_date.write.mode("append").insertInto("test")

In [0]:
%sql
select * from test;

In [0]:
df_joined=df_sales.alias("a").join(df_mapping.alias("b"),df_sales["Salesperson"]==df_mapping["Salesperson"],'inner')\
    .select(col("Date"),col("a.Region"),col("Product"),col("Units Sold"),col("Unit Price"),col("Total Revenue"),col("Customer Name"))\
        .withColumnRenamed("Customer Name","cust_nm")\
            .withColumnRenamed("Total Revenue","tot_rev")\
                .withColumnRenamed("Units Sold","units_sold")\
                    .withColumnRenamed("Unit Price","unit_price")

In [0]:
from pyspark.sql.window import Window
window=Window.partitionBy("cust_nm").orderBy(col('Date').desc())
df_joined.withColumn("Date",to_date(col("Date"),"yyyy-mm-dd")).withColumn("Date",date_format(col("Date"),"d/MMM/y")).withColumn("running_sum",sum("tot_rev").over(window))\
    .withColumn("lead",lead("tot_rev").over(window))\
        .display()