In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("DatabricksTraining").getOrCreate()

# Load data from Hive tables
df_customers = spark.table("hive_metastore.default.customers")
df_order_items = spark.table("hive_metastore.default.order_items")
df_orders = spark.table("hive_metastore.default.orders")
df_product = spark.table("hive_metastore.default.product")

# Display the first few rows of each DataFrame to verify the data
print("Customers DataFrame:")
df_customers.show(5)

print("Order Items DataFrame:")
df_order_items.show(5)

print("Orders DataFrame:")
df_orders.show(5)

print("Product DataFrame:")
df_product.show(5)

# Print the schema of each DataFrame
print("Customers Schema:")
df_customers.printSchema()

print("Order Items Schema:")
df_order_items.printSchema()

print("Orders Schema:")
df_orders.printSchema()

print("Product Schema:")
df_product.printSchema()

# Cache the DataFrames for faster processing in subsequent operations
df_customers.cache()
df_order_items.cache()
df_orders.cache()
df_product.cache()

# Count the number of rows in each DataFrame
print("Number of rows in Customers:", df_customers.count())
print("Number of rows in Order Items:", df_order_items.count())
print("Number of rows in Orders:", df_orders.count())
print("Number of rows in Product:", df_product.count())

Customers DataFrame:
+---+-------------+--------------+------+----------+-------------+----------+--------------------+------+
| id| customername|         state|  city|created_on|date_of_birth|updated_on|               email|gender|
+---+-------------+--------------+------+----------+-------------+----------+--------------------+------+
|267|  Mala Pratap|Madhya Pradesh|Indore|2018-12-06|   1983-11-04|2018-12-06|Mala Pratap@outlo...|     f|
| 59|      Anudeep|Madhya Pradesh|Indore|2018-08-26|   1978-09-09|2018-08-26|                null|  null|
|273|Shakshi Sagar|      Nagaland|Kohima|2018-04-17|   1996-11-06|2019-03-27|     Sagar@gmail.com|     f|
|116| Ekta Chauhan|Madhya Pradesh|Indore|2018-06-28|   1987-04-20|2018-06-28|Ekta Chauhan@outl...|     f|
| 92|     Bhutekar|Madhya Pradesh|Indore|2019-01-04|   1989-10-08|2019-01-04|                null|  null|
+---+-------------+--------------+------+----------+-------------+----------+--------------------+------+
only showing top 5 rows



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, coalesce, lit
from delta.tables import DeltaTable

# Create a Spark session
spark = SparkSession.builder \
    .appName("DatabricksTraining") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

def handle_nulls_and_types(df, table_name):
    if table_name == 'customers':
        return df.withColumn("id", col("id").cast("integer")) \
                 .withColumn("created_on", to_date(col("created_on"), "yyyy-MM-dd")) \
                 .withColumn("date_of_birth", to_date(col("date_of_birth"), "yyyy-MM-dd")) \
                 .withColumn("updated_on", to_date(col("updated_on"), "yyyy-MM-dd")) \
                 .withColumn("email", when(col("email") == "", None).otherwise(col("email"))) \
                 .withColumn("gender", when(col("gender") == "", None).otherwise(col("gender")))
    elif table_name == 'order_items':
        return df.withColumn("order_id", coalesce(col("order_id"), lit("Unknown"))) \
                 .withColumn("amount", col("amount").cast("double")) \
                 .withColumn("profit", col("profit").cast("double")) \
                 .withColumn("quantity", col("quantity").cast("integer")) \
                 .withColumn("product_id", col("product_id").cast("integer"))
    elif table_name == 'orders':
        return df.withColumn("order_date", to_date(col("order_date"), "dd-MM-yyyy")) \
                 .withColumn("customer_id", col("customer_id").cast("integer"))
    elif table_name == 'product':
        return df.withColumn("id", col("id").cast("integer"))
    else:
        return df

def ingest_to_bronze(df, table_name):
    bronze_path = f"/bronze/{table_name}"
    
    # Handle nulls and data types
    df = handle_nulls_and_types(df, table_name)
    
    # Write the data to Delta format with schema enforcement
    df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(bronze_path)
    
    # Create a Delta table
    delta_table = DeltaTable.forPath(spark, bronze_path)
    
    # Optimize the table
    delta_table.optimize().executeCompaction()
    
    print(f"Data ingested to Bronze layer: {bronze_path}")

# Load data from Hive tables and ingest to Bronze layer
tables = ["customers", "order_items", "orders", "product"]
for table in tables:
    try:
        df = spark.table(f"hive_metastore.default.{table}")
        ingest_to_bronze(df, table)
    except Exception as e:
        print(f"Error ingesting {table}: {str(e)}")

# Verify the ingested data
for table in tables:
    try:
        bronze_df = spark.read.format("delta").load(f"/bronze/{table}")
        print(f"\nBronze layer - {table} schema:")
        bronze_df.printSchema()
        print(f"Bronze layer - {table} count: {bronze_df.count()}")
        print(f"Sample data from Bronze layer - {table}:")
        bronze_df.show(5, truncate=False)
    except Exception as e:
        print(f"Error reading {table} from Bronze layer: {str(e)}")

Data ingested to Bronze layer: /bronze/customers
Data ingested to Bronze layer: /bronze/order_items
Data ingested to Bronze layer: /bronze/orders
Data ingested to Bronze layer: /bronze/product

Bronze layer - customers schema:
root
 |-- id: integer (nullable = true)
 |-- customername: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- created_on: date (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- updated_on: date (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)

Bronze layer - customers count: 401
Sample data from Bronze layer - customers:
+---+-------------+--------------+------+----------+-------------+----------+------------------------+------+
|id |customername |state         |city  |created_on|date_of_birth|updated_on|email                   |gender|
+---+-------------+--------------+------+----------+-------------+----------+------------------------+------+
|267|Mal

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, lit, year, to_date, regexp_extract, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from delta.tables import DeltaTable

spark = SparkSession.builder \
    .appName("DatabricksTraining") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define the schema for the audit log table
audit_schema = StructType([
    StructField("table_name", StringType(), False),
    StructField("check_type", StringType(), False),
    StructField("check_result", StringType(), False),
    StructField("affected_rows", IntegerType(), False),
    StructField("timestamp", TimestampType(), False)
])

# Create or get the audit log Delta table
audit_log_path = "/bronze/audit_log"
try:
    audit_log = DeltaTable.forPath(spark, audit_log_path)
except:
    empty_audit_df = spark.createDataFrame([], audit_schema)
    empty_audit_df.write.format("delta").save(audit_log_path)
    audit_log = DeltaTable.forPath(spark, audit_log_path)

def log_audit(table_name, check_type, check_result, affected_rows):
    # Create a DataFrame with a single row, using the current timestamp
    audit_df = spark.createDataFrame([(
        table_name,
        check_type,
        check_result,
        affected_rows,
        spark.sql("SELECT CURRENT_TIMESTAMP").collect()[0][0]
    )], audit_schema)
    
    # Merge the new audit entry into the audit log table
    audit_log.alias("audit").merge(
        audit_df.alias("updates"),
        "audit.table_name = updates.table_name AND audit.check_type = updates.check_type"
    ).whenNotMatchedInsertAll().execute()

def perform_customers_quality_checks(df):
    # Check for null values in all columns
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        if null_count > 0:
            log_audit("customers", f"null_check_{column}", "FAIL", null_count)
    
    # Check for duplicates
    duplicate_count = df.count() - df.dropDuplicates().count()
    if duplicate_count > 0:
        log_audit("customers", "duplicate_records", "FAIL", duplicate_count)

    # Check for invalid dates of birth
    invalid_dob = df.filter(
        (col("date_of_birth").isNotNull()) & 
        ((col("date_of_birth") > current_timestamp()) | (year(col("date_of_birth")) < 1900))
    ).count()
    if invalid_dob > 0:
        log_audit("customers", "invalid_date_of_birth", "FAIL", invalid_dob)
    
    # Check for invalid email formats
    invalid_email = df.filter(
        (col("email").isNotNull()) & 
        (~col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"))
    ).count()
    if invalid_email > 0:
        log_audit("customers", "invalid_email_format", "FAIL", invalid_email)
    
    # Check for invalid gender values
    invalid_gender = df.filter(
        (col("gender").isNotNull()) & 
        (~col("gender").isin("m", "f", "M", "F"))
    ).count()
    if invalid_gender > 0:
        log_audit("customers", "invalid_gender", "FAIL", invalid_gender)

# Perform quality checks on the customers table
try:
    customers_df = spark.read.format("delta").load("/bronze/customers")
    print("Performing quality checks on Bronze layer - customers")
    perform_customers_quality_checks(customers_df)
except Exception as e:
    print(f"Error processing customers from Bronze layer: {str(e)}")

# Display the audit log
print("\nAudit Log:")
spark.read.format("delta").load(audit_log_path).orderBy("timestamp", ascending=False).show(truncate=False)

# Display sample data and schema for the customers table
try:
    customers_df = spark.read.format("delta").load("/bronze/customers")
    print("\nBronze layer - customers schema:")
    customers_df.printSchema()
    print(f"Bronze layer - customers count: {customers_df.count()}")
    print("Sample data from Bronze layer - customers:")
    customers_df.show(5, truncate=False)
except Exception as e:
    print(f"Error reading customers from Bronze layer: {str(e)}")

Performing quality checks on Bronze layer - customers

Audit Log:
+----------+--------------------+------------+-------------+-----------------------+
|table_name|check_type          |check_result|affected_rows|timestamp              |
+----------+--------------------+------------+-------------+-----------------------+
|customers |invalid_email_format|FAIL        |37           |2024-10-21 06:51:02.509|
|customers |null_check_gender   |FAIL        |158          |2024-10-21 06:50:53.406|
|customers |null_check_email    |FAIL        |20           |2024-10-21 06:50:46.544|
+----------+--------------------+------------+-------------+-----------------------+


Bronze layer - customers schema:
root
 |-- id: integer (nullable = true)
 |-- customername: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- created_on: date (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- updated_on: date (nullable = true)
 |-- email: string (null

In [0]:
# POTENTIAL ISSUES HAVE IDNTIFIED WITH QUALITY ISSUES, BUT SINCE THEY DO NOT AFFECT THE FINAL OUTPUT IN TERMS OF SALES PREDICTION FIGURES THEY REMAIN AS IT IS ,  
#TOTAL NUMBER OF PROBLEMATIC ROWS HAVE BEEN IDENTIFIED IN VARIOUS ASPECTS OF INCONSISTENCT AND THE ERRORS HAVE BEEN LOGGED

In [0]:
from pyspark.sql.functions import col, upper, trim, to_date, datediff, current_date
from delta.tables import DeltaTable

# Assuming 'spark' is already available in Databricks environment

# Read data from Bronze tables
customers_bronze = spark.read.format("delta").load("/bronze/customers")
orders_bronze = spark.read.format("delta").load("/bronze/orders")
order_items_bronze = spark.read.format("delta").load("/bronze/order_items")
products_bronze = spark.read.format("delta").load("/bronze/product")

# Optimized Customers table
customers_silver = customers_bronze.select(
    col("id").alias("customer_id"),
    trim(upper(col("customername"))).alias("customer_name"),
    trim(upper(col("state"))).alias("state"),
    trim(upper(col("city"))).alias("city")
)

# Optimized Orders table
orders_silver = orders_bronze.select(
    col("id").alias("order_id"),
    to_date(col("order_date")).alias("order_date"),
    col("customer_id")
)

# Optimized Order Items table
order_items_silver = order_items_bronze.select(
    col("order_id"),
    col("product_id"),
    col("quantity"),
    col("amount").alias("unit_price"),
    (col("amount") * col("quantity")).alias("total_amount"),
    col("profit")
)

# Optimized Products table
products_silver = products_bronze.select(
    col("id").alias("product_id"),
    trim(upper(col("category"))).alias("product_category"),
    trim(col("product")).alias("product_name")
)

# Join tables to form a comprehensive dataset
comprehensive_orders = orders_silver \
    .join(customers_silver, "customer_id") \
    .join(order_items_silver, "order_id") \
    .join(products_silver, "product_id") \
    .select(
        "order_id",
        "order_date",
        "customer_id",
        "customer_name",
        "state",
        "city",
        "product_id",
        "product_category",
        "product_name",
        "quantity",
        "unit_price",
        "total_amount",
        "profit"
    )

# Write transformed data to Silver layer
silver_path = "/silver/comprehensive_orders"
comprehensive_orders.write.format("delta").mode("overwrite").save(silver_path)
print(f"Data written to Silver layer: {silver_path}")

# Display sample data from Silver layer
print("\nSample data from Silver layer - comprehensive_orders:")
spark.read.format("delta").load(silver_path).show(50, truncate=False)

# Create Delta table for easier querying
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS comprehensive_orders
    USING DELTA
    LOCATION '{silver_path}'
""")

print("Silver layer table created and registered.")

# Verify the table creation
spark.sql("SHOW TABLES").show()

Data written to Silver layer: /silver/comprehensive_orders

Sample data from Silver layer - comprehensive_orders:
+--------+----------+-----------+-------------+----------------+----------+----------+----------------+----------------+--------+----------+------------+------+
|order_id|order_date|customer_id|customer_name|state           |city      |product_id|product_category|product_name    |quantity|unit_price|total_amount|profit|
+--------+----------+-----------+-------------+----------------+----------+----------+----------------+----------------+--------+----------+------------+------+
|B-25897 |2018-12-06|267        |MALA PRATAP  |MADHYA PRADESH  |INDORE    |11        |ELECTRONICS     |Electronic Games|6       |734.0     |4404.0      |213.0 |
|B-25897 |2018-12-06|267        |MALA PRATAP  |MADHYA PRADESH  |INDORE    |7         |CLOTHING        |Stole           |3       |43.0      |129.0       |5.0   |
|B-25897 |2018-12-06|267        |MALA PRATAP  |MADHYA PRADESH  |INDORE    |1     

In [0]:
from pyspark.sql.functions import sum, count, avg, desc, dense_rank
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Read data from the Silver layer
silver_df = spark.read.format("delta").load("/silver/comprehensive_orders")

# Function to write data to Gold layer
def write_to_gold(df, table_name):
    gold_path = f"/gold/{table_name}"
    df.write.format("delta").mode("overwrite").save(gold_path)
    print(f"Data written to Gold layer: {gold_path}")
    
    # Create Delta table for easier querying
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name}
        USING DELTA
        LOCATION '{gold_path}'
    """)

In [0]:
%sql
-- 1. Find the total profit generated by each product category
SELECT 
    product_category,
    profit,
    SUM(profit) OVER (PARTITION BY product_category) AS total_profit
FROM 
    default.comprehensive_orders
ORDER BY 
    total_profit DESC;


product_category,profit,total_profit
CLOTHING,5.0,11163.0
CLOTHING,11.0,11163.0
CLOTHING,10.0,11163.0
CLOTHING,4.0,11163.0
CLOTHING,-5.0,11163.0
CLOTHING,22.0,11163.0
CLOTHING,8.0,11163.0
CLOTHING,-3.0,11163.0
CLOTHING,-141.0,11163.0
CLOTHING,-27.0,11163.0


In [0]:
%sql
-- 2. Number of orders placed by customers in each state
SELECT 
    state,
    COUNT(DISTINCT order_id) as order_count
FROM comprehensive_orders
GROUP BY state
ORDER BY order_count DESC;

state,order_count
MADHYA PRADESH,101
MAHARASHTRA,90
RAJASTHAN,32
GUJARAT,27
PUNJAB,25
WEST BENGAL,22
DELHI,22
UTTAR PRADESH,22
KARNATAKA,21
BIHAR,16


In [0]:
%sql
-- 3. Total amount spent by each customer
SELECT 
    customer_id,
    customer_name,
    ROUND(SUM(total_amount), 2) as total_spent
FROM comprehensive_orders
GROUP BY customer_id, customer_name
ORDER BY total_spent DESC;

customer_id,customer_name,total_spent
397,YAANVI,103435.0
296,SHRIMATI SEEMA,64222.0
343,SOUMYA,42906.0
306,SHISHUL PAL,38896.0
360,REETA AND HER DOUGHTER SURABHI,34690.0
290,SARITA DEVI,31837.0
201,MUKESH DEVI,30708.0
368,SWAPNIL,30437.0
118,FARAH KHAN,30271.0
239,SMT. POOJA URF RANI,27928.0


In [0]:
%sql
-- 4. Average profit per order for each city
SELECT 
    city,
    ROUND(AVG(profit), 2) as avg_profit_per_order
FROM comprehensive_orders
GROUP BY city
ORDER BY avg_profit_per_order DESC;

city,avg_profit_per_order
ALLAHABAD,102.7
UDAIPUR,67.0
PUNE,54.69
SURAT,53.8
DELHI,43.31
THIRUVANANTHAPURAM,41.58
KOLKATA,39.68
AMRITSAR,36.27
SIMLA,22.62
GANGTOK,16.71


In [0]:
%sql
-- 5. Top 5 customers who spent the most
SELECT 
    customer_id,
    customer_name,
    ROUND(SUM(total_amount), 2) as total_spent
FROM comprehensive_orders
GROUP BY customer_id, customer_name
ORDER BY total_spent DESC
LIMIT 5;

customer_id,customer_name,total_spent
397,YAANVI,103435.0
296,SHRIMATI SEEMA,64222.0
343,SOUMYA,42906.0
306,SHISHUL PAL,38896.0
360,REETA AND HER DOUGHTER SURABHI,34690.0


In [0]:
%sql
-- 6. Total revenue generated by each product
SELECT 
    product_id,
    product_name,
    ROUND(SUM(total_amount), 2) as total_revenue
FROM comprehensive_orders
GROUP BY product_id, product_name
ORDER BY total_revenue DESC;


product_id,product_name,total_revenue
13,Printers,307963.0
14,Bookcases,295598.0
4,Saree,263523.0
15,Chairs,206479.0
11,Electronic Games,204850.0
12,Phones,200893.0
9,Trousers,124640.0
10,Accessories,102877.0
17,Tables,90706.0
7,Stole,86155.0


In [0]:
%sql
-- 7. Number of orders placed in each product category
SELECT 
    product_category,
    COUNT(DISTINCT order_id) as order_count
FROM comprehensive_orders
GROUP BY product_category
ORDER BY order_count DESC;

product_category,order_count
CLOTHING,393
ELECTRONICS,204
FURNITURE,186


In [0]:
%sql
-- 8. Average profit per order for each product category
SELECT 
    product_category,
    ROUND(AVG(profit), 2) as avg_profit_per_order
FROM comprehensive_orders
GROUP BY product_category
ORDER BY avg_profit_per_order DESC;

product_category,avg_profit_per_order
ELECTRONICS,34.07
CLOTHING,11.76
FURNITURE,9.46


In [0]:
%sql
-- 9. Total amount spent in each city
SELECT 
    city,
    ROUND(SUM(total_amount), 2) as total_spent
FROM comprehensive_orders
GROUP BY city
ORDER BY total_spent DESC;

city,total_spent
INDORE,455989.0
MUMBAI,293206.0
PUNE,174454.0
ALLAHABAD,120812.0
CHANDIGARH,115237.0
DELHI,106037.0
BHOPAL,104730.0
HYDERABAD,82897.0
AHMEDABAD,69346.0
BANGALORE,66231.0


In [0]:
%sql
-- 10. Number of orders placed for each product
SELECT 
    product_id,
    product_name,
    COUNT(DISTINCT order_id) as order_count
FROM comprehensive_orders
GROUP BY product_id, product_name
ORDER BY order_count DESC;

product_id,product_name,order_count
7,Stole,157
4,Saree,156
1,Hankerchief,138
11,Electronic Games,73
14,Bookcases,72
12,Phones,71
8,T-shirt,70
13,Printers,67
5,Shirt,66
16,Furnishings,66


The data seems promising so we export the gold layer in the end to implemen in powerBI for better visuals and implementation of the same


In [0]:
def export_comprehensive_data():
    print("Exporting comprehensive orders data...")
    
    # Read from silver layer
    comprehensive_orders = spark.read.format("delta").load("/silver/comprehensive_orders")
    
    # Show sample and count
    print("\nSample data from comprehensive orders:")
    comprehensive_orders.show(5)
    print(f"Total rows: {comprehensive_orders.count()}")
    
    # Export to CSV
    output_path = "/FileStore/powerbi_analysis/powerbi_comprehensive_orders"
    comprehensive_orders.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
    print(f"Exported comprehensive data to {output_path}")
    
    # Create temp view for subsequent queries
    comprehensive_orders.createOrReplaceTempView("comprehensive_orders")
    return comprehensive_orders

# 2. Export Analysis Tables (using your existing analysis tables code)
def export_analysis_table(query, table_name):
    print(f"\nProcessing {table_name}...")
    
    df = spark.sql(query)
    print(f"\nSample data from {table_name}:")
    df.show(5)
    print(f"Total rows in {table_name}: {df.count()}")
    
    output_path = f"/FileStore/powerbi_analysis/{table_name}"
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
    print(f"Exported to {output_path}")
    return df

# Execute the exports
print("Starting data export process...")

# First export comprehensive data
comprehensive_df = export_comprehensive_data()

# Now export all analysis tables
analysis_queries = {
    "powerbi_category_analysis": """
        SELECT 
            product_category,
            ROUND(SUM(profit), 2) as total_profit,
            COUNT(DISTINCT order_id) as order_count,
            ROUND(SUM(total_amount), 2) as total_revenue,
            ROUND(AVG(profit), 2) as avg_profit_per_order
        FROM comprehensive_orders
        GROUP BY product_category
    """,
    
    "powerbi_customer_analysis": """
        SELECT 
            customer_id,
            customer_name,
            state,
            city,
            ROUND(SUM(total_amount), 2) as total_spent,
            COUNT(DISTINCT order_id) as order_count,
            ROUND(AVG(profit), 2) as avg_profit_per_customer
        FROM comprehensive_orders
        GROUP BY customer_id, customer_name, state, city
    """,
    
    "powerbi_product_analysis": """
        SELECT 
            product_id,
            product_name,
            product_category,
            SUM(quantity) as total_quantity,
            ROUND(SUM(total_amount), 2) as total_revenue,
            ROUND(SUM(profit), 2) as total_profit,
            COUNT(DISTINCT order_id) as number_of_orders
        FROM comprehensive_orders
        GROUP BY product_id, product_name, product_category
    """,
    
    "powerbi_geographic_analysis": """
        SELECT 
            state,
            city,
            COUNT(DISTINCT order_id) as order_count,
            COUNT(DISTINCT customer_id) as customer_count,
            ROUND(SUM(total_amount), 2) as total_revenue,
            ROUND(SUM(profit), 2) as total_profit,
            ROUND(AVG(total_amount), 2) as avg_order_value
        FROM comprehensive_orders
        GROUP BY state, city
    """,
    
    "powerbi_time_analysis": """
        SELECT 
            order_date,
            COUNT(DISTINCT order_id) as daily_orders,
            COUNT(DISTINCT customer_id) as daily_customers,
            ROUND(SUM(total_amount), 2) as daily_revenue,
            ROUND(SUM(profit), 2) as daily_profit
        FROM comprehensive_orders
        GROUP BY order_date
        ORDER BY order_date
    """
}

# Execute all analysis exports
analysis_dfs = {name: export_analysis_table(query, name) 
                for name, query in analysis_queries.items()}

# Create catalog of exported files
catalog_data = [
    ("powerbi_comprehensive_orders", "Complete raw data with all orders and details"),
] + [(name, "Analysis table: " + name.replace("powerbi_", "").replace("_", " ")) 
     for name in analysis_queries.keys()]

catalog_df = spark.createDataFrame(catalog_data, ["file_name", "description"])

print("\nCatalog of exported files:")
catalog_df.show(truncate=False)

print("\nData export complete! Files are ready in /FileStore/powerbi_analysis/")

Starting data export process...
Exporting comprehensive orders data...

Sample data from comprehensive orders:
+--------+----------+-----------+-------------+--------------+------+----------+----------------+----------------+--------+----------+------------+------+
|order_id|order_date|customer_id|customer_name|         state|  city|product_id|product_category|    product_name|quantity|unit_price|total_amount|profit|
+--------+----------+-----------+-------------+--------------+------+----------+----------------+----------------+--------+----------+------------+------+
| B-25897|2018-12-06|        267|  MALA PRATAP|MADHYA PRADESH|INDORE|        11|     ELECTRONICS|Electronic Games|       6|     734.0|      4404.0| 213.0|
| B-25897|2018-12-06|        267|  MALA PRATAP|MADHYA PRADESH|INDORE|         7|        CLOTHING|           Stole|       3|      43.0|       129.0|   5.0|
| B-25897|2018-12-06|        267|  MALA PRATAP|MADHYA PRADESH|INDORE|         1|        CLOTHING|     Hankerchief|

In [0]:
# List all files in the powerbi_analysis directory
print("Files in powerbi_analysis directory:")
files = dbutils.fs.ls("/FileStore/powerbi_analysis/")

# Function to display data from each analysis file
def display_analysis_file(file_path):
    print(f"\nDisplaying data from: {file_path}")
    # Read the data
    df = spark.read.format("csv").option("header", "true").load(file_path)
    # Display for downloading
    display(df)

# Iterate through each directory in powerbi_analysis
for file in files:
    if file.name.startswith("powerbi_"):
        # Get all files in this directory
        analysis_files = dbutils.fs.ls(file.path)
        # Find the CSV file (not _SUCCESS or other system files)
        csv_files = [f for f in analysis_files if not f.name.startswith("_") and not f.name.startswith(".")]
        if csv_files:
            # Display the data for downloading
            display_analysis_file(csv_files[0].path)

Files in powerbi_analysis directory:

Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_category_analysis/part-00000-tid-3706926620485754977-e0ab2d01-715e-4954-8297-6ad5c9008ee0-590-1-c000.csv


product_category,total_profit,order_count,total_revenue,avg_profit_per_order
ELECTRONICS,10494.0,204,816583.0,34.07
FURNITURE,2298.0,186,665765.0,9.46
CLOTHING,11163.0,393,664522.0,11.76



Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_comprehensive_orders/part-00000-tid-2232241294711308768-97dc0694-719d-4a87-aa2f-f95404d46fb9-581-1-c000.csv


order_id,order_date,customer_id,customer_name,state,city,product_id,product_category,product_name,quantity,unit_price,total_amount,profit
B-25897,2018-12-06,267,MALA PRATAP,MADHYA PRADESH,INDORE,11,ELECTRONICS,Electronic Games,6,734.0,4404.0,213.0
B-25897,2018-12-06,267,MALA PRATAP,MADHYA PRADESH,INDORE,7,CLOTHING,Stole,3,43.0,129.0,5.0
B-25897,2018-12-06,267,MALA PRATAP,MADHYA PRADESH,INDORE,1,CLOTHING,Hankerchief,5,24.0,120.0,11.0
B-25897,2018-12-06,267,MALA PRATAP,MADHYA PRADESH,INDORE,1,CLOTHING,Hankerchief,3,33.0,99.0,10.0
B-25762,2018-08-26,59,ANUDEEP,MADHYA PRADESH,INDORE,11,ELECTRONICS,Electronic Games,7,1316.0,9212.0,-527.0
B-25762,2018-08-26,59,ANUDEEP,MADHYA PRADESH,INDORE,6,CLOTHING,Skirt,3,27.0,81.0,4.0
B-25762,2018-08-26,59,ANUDEEP,MADHYA PRADESH,INDORE,4,CLOTHING,Saree,2,98.0,196.0,-5.0
B-25617,2018-04-17,273,SHAKSHI SAGAR,NAGALAND,KOHIMA,11,ELECTRONICS,Electronic Games,5,305.0,1525.0,-270.0
B-26090,2019-03-27,273,SHAKSHI SAGAR,NAGALAND,KOHIMA,7,CLOTHING,Stole,3,80.0,240.0,22.0
B-26027,2019-02-20,273,SHAKSHI SAGAR,NAGALAND,KOHIMA,8,CLOTHING,T-shirt,4,54.0,216.0,8.0



Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_customer_analysis/part-00000-tid-7308876733058259855-caf79e7d-9b4c-4504-9daa-41ae39f53165-599-1-c000.csv


customer_id,customer_name,state,city,total_spent,order_count,avg_profit_per_customer
389,VIJAY KUMAR GHAI,JAMMU AND KASHMIR,KASHMIR,12975.0,1,-41.71
385,VAIBHAV,MADHYA PRADESH,INDORE,24483.0,1,34.43
380,TULIKA,MADHYA PRADESH,BHOPAL,25019.0,2,-8.56
331,SMT. SHWETA GUPTA,MADHYA PRADESH,INDORE,92.0,1,0.0
218,NRIPRAJ,PUNJAB,CHANDIGARH,8209.0,1,56.5
266,MALA BAGGA,HIMACHAL PRADESH,SIMLA,4361.0,1,26.33
214,NISHITH BHATTACHARYA,MAHARASHTRA,MUMBAI,3441.0,2,58.0
199,MRUNAL,MAHARASHTRA,MUMBAI,2361.0,1,23.4
228,PAROMITA,PUNJAB,AMRITSAR,5677.0,1,154.0
299,SHARDANAND,KERALA,THIRUVANANTHAPURAM,6054.0,2,79.6



Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_geographic_analysis/part-00000-tid-2895117109601696779-b76bec75-2e3a-46eb-8d77-33ca8a62c4bf-617-1-c000.csv


state,city,order_count,customer_count,total_revenue,total_profit,avg_order_value
GUJARAT,AHMEDABAD,17,13,69346.0,-880.0,1118.48
KARNATAKA,BANGALORE,21,15,66231.0,645.0,1226.5
JAMMU AND KASHMIR,KASHMIR,14,8,53201.0,8.0,1085.73
MADHYA PRADESH,BHOPAL,22,16,104730.0,871.0,1586.82
MADHYA PRADESH,INDORE,76,63,455989.0,4159.0,1707.82
UTTAR PRADESH,LUCKNOW,13,11,29220.0,156.0,768.95
WEST BENGAL,KOLKATA,22,16,58035.0,2500.0,921.19
RAJASTHAN,JAIPUR,19,13,42601.0,-753.0,968.2
PUNJAB,AMRITSAR,9,9,17245.0,544.0,1149.67
HARYANA,CHANDIGARH,14,10,54891.0,1325.0,2111.19



Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_product_analysis/part-00000-tid-862324434966130922-0c7d16b5-d181-498c-b60e-4d2e2d904742-608-1-c000.csv


product_id,product_name,product_category,total_quantity,total_revenue,total_profit,number_of_orders
17,Tables,FURNITURE,61,90706.0,-4011.0,16
11,Electronic Games,ELECTRONICS,297,204850.0,-1236.0,73
10,Accessories,ELECTRONICS,262,102877.0,3559.0,65
15,Chairs,FURNITURE,277,206479.0,577.0,64
13,Printers,ELECTRONICS,291,307963.0,5964.0,67
1,Hankerchief,CLOTHING,754,75518.0,2098.0,138
9,Trousers,CLOTHING,135,124640.0,2847.0,37
3,Leggings,CLOTHING,186,9061.0,260.0,49
12,Phones,ELECTRONICS,304,200893.0,2207.0,71
5,Shirt,CLOTHING,271,39373.0,1131.0,66



Displaying data from: dbfs:/FileStore/powerbi_analysis/powerbi_time_analysis/part-00000-tid-486356994146579581-9518a08f-4baa-441a-abdc-e1f87373df17-628-1-c000.csv


order_date,daily_orders,daily_customers,daily_revenue,daily_profit
2018-04-01,2,2,25158.0,-243.0
2018-04-03,2,2,10629.0,-158.0
2018-04-05,1,1,525.0,0.0
2018-04-06,2,2,374.0,19.0
2018-04-08,1,1,14669.0,-1456.0
2018-04-09,2,2,11987.0,-722.0
2018-04-11,1,1,320.0,-59.0
2018-04-12,2,2,14945.0,-55.0
2018-04-13,1,1,2172.0,42.0
2018-04-15,2,2,1238.0,-28.0
