# 🚀 Databricks Project Setup & Pipeline Guide: alldatatech_project

This document provides step-by-step instructions for setting up the environment, ingesting initial data, and configuring the monthly extraction pipeline within **Databricks** using **Unity Catalog**.

---

## 1. ⚙️ Initial Environment Setup & Data Ingestion

The steps below are executed using the **`alldatatech_project Setup and Data Ingestion.ipynb`** notebook.

### 1.1 Pre-Requisite Actions

1.  **Download Files:** Download all necessary project files from the **GitHub** repository to your local machine.
2.  **Import Notebook:** Import the notebook named **`alldatatech_project Setup and Data Ingestion.ipynb`** into your Databricks workspace.

### 1.2 Notebook Execution Steps

Execute the query cells in the imported setup notebook **sequentially** to establish the necessary database objects and prepare for data ingestion.

| Step # | Action | Description |
| :--- | :--- | :--- |
| **1** | Creating a New Spark **Catalog** | Defines the top-level namespace (e.g., `alldatatech_project`) in Unity Catalog. |
| **2** | Creating a New Spark **Schema** (Database) | Creates a schema within the catalog (e.g., `sales_reporting`) to organize tables. |
| **3** | Creating the **`regional_sales_data`** Delta Table | Defines the target Delta Lake table for the ingested data. |
| **4** | Creating a Unity Catalog **Volume** | Sets up a Unity Catalog **Volume** to manage access to cloud storage for raw files (e.g., CSVs). |
| **5** | **Upload Sample Data** (Manual) | **MANUAL ACTION:** Upload the file **`Regional_sales_dataset.csv`** to the following Volume path: **`alldatatech_project.sales_reporting.sales`**. |
| **6** | PySpark: Ingesting CSV Data | Reads the uploaded CSV file from the Volume and loads it into the **`regional_sales_data`** Delta table. |
| **7** | Viewing All Data | Validates successful ingestion by displaying the contents of the target table. |
| **8** | Creating a Subdirectory | Creates a new subdirectory within the Volume path (e.g., for archiving processed files). |

---

## 2. 🚀 Configuring the Extraction Pipeline

This section configures the monthly extraction process using a Databricks Job pipeline.

### 2.1 Job Creation via YAML

1.  **Navigate to Jobs:** Go to **Workflows** (or Jobs & Pipelines) in the Databricks workspace sidebar.
2.  **Create Job:** Click **"Create Job"**.
3.  **Edit as YAML:** Switch the job creation interface to the **"Edit as YAML"** view.
4.  **Paste YAML Code:** Copy the entire content of the **`wf_regional_sales_extract.yml`** file and paste it into the YAML editor.
5.  **Save:** Save the job configuration. This creates the pipeline named **`wf_regional_sales_extract`**.

### 2.2 Notebook Preparation

1.  **Import Extraction Notebook:** Import the notebook named **`Sales_Extract.ipynb`** into your Databricks workspace.
2.  **Update Pipeline Path:**
    * Open the newly created job **`wf_regional_sales_extract`**.
    * Inspect the notebook task within the job.
    * **Update the Notebook Path** in the job task definition to point to the exact location of the imported **`Extract.ipynb`** file in your workspace.

### 2.3 Running the Pipeline

1.  **Execute Job:** Open the **`wf_regional_sales_extract`** job.
2.  Click **"Run now"** to execute the pipeline and initiate the data extraction process.

---

## Key Artifacts Summary

| Artifact | Type | Purpose |
| :--- | :--- | :--- |
| **`alldatatech_project Setup and Data Ingestion.ipynb`** | Notebook | One-time setup of UC Catalog, Schema, Volume, and Delta Table. |
| **`Regional_sales_dataset.csv`** | Raw Data File | Initial data source uploaded to the Volume. |
| **`wf_regional_sales_extract.yml`** | YAML Definition | Configuration file for the Databricks Job (Pipeline). |
| **`Sales_Extract.ipynb`** | Notebook | Contains the primary ETL/extraction logic run by the job. |

In [0]:
spark.sql("CREATE CATALOG alldatatech_project")

In [0]:
spark.sql("CREATE SCHEMA alldatatech_project.sales_reporting")

In [0]:
 %sql
CREATE TABLE alldatatech_project.sales_reporting.regional_sales_data (
  sale_id BIGINT COMMENT 'Unique identifier for the sales transaction',
  sale_date DATE COMMENT 'The date the sale occurred (used for partitioning)',
  region_name STRING COMMENT 'The geographic region where the sale was made (e.g., North, South, East, West)',
  country_code STRING COMMENT 'Two-letter ISO country code (e.g., US, CA, UK)',
  product_sku STRING COMMENT 'Stock Keeping Unit (SKU) identifier for the product sold',
  product_name STRING COMMENT 'The full name of the product sold',
  quantity INT COMMENT 'The number of units sold in this transaction',
  unit_price DECIMAL(10, 2) COMMENT 'The price per unit at the time of sale',
  total_revenue DECIMAL(12, 2) COMMENT 'The calculated total revenue for this line item (quantity * unit_price)',
  dealer_id STRING COMMENT 'Identifier for the dealer or store where the sale originated',  
  customer_id STRING COMMENT 'Unique identifier for the purchasing customer',
  customer_segment STRING COMMENT 'Categorization of the customer (e.g., B2B, B2C, Premium)',  
  discount_amount DECIMAL(10, 2) COMMENT 'Total discount applied to this transaction line item',
  cost_of_goods_sold DECIMAL(10, 2) COMMENT 'The direct cost to the company for the units sold',
  gross_profit DECIMAL(12, 2) COMMENT 'Calculated gross profit: total_revenue - cost_of_goods_sold',
  payment_method STRING COMMENT 'The method of payment used (e.g., Credit Card, Cash, Invoice)',  
  shipment_status STRING COMMENT 'The current status of the order fulfillment (e.g., Shipped, Delivered, Pending)',
  shipping_fee DECIMAL(8, 2) COMMENT 'The fee charged for shipping this line item',  
  sales_channel STRING COMMENT 'The primary channel of the sale (e.g., Web, Retail, Field Sales, Tele)',
  campaign_id STRING COMMENT 'The ID of the marketing campaign that influenced this sale, if any',
  sales_rep_id STRING COMMENT 'The ID of the sales representative or employee responsible for the sale',
  created_ts TIMESTAMP COMMENT 'The timestamp when this record was inserted into the table'
)
USING delta
PARTITIONED BY (sale_date)
COMMENT 'Detailed transactional data for regional sales reporting and analysis, including marketing and personnel attribution.'
TBLPROPERTIES (
  'delta.minReaderVersion' = '1',
  'delta.minWriterVersion' = '2',
  'external.table.purge' = 'true',
  'delta.logRetentionDuration' = 'INTERVAL 30 DAYS'
);

In [0]:
spark.sql("CREATE VOLUME alldatatech_project.sales_reporting.sales")

#### MANUAL ACTION: Upload the file Regional_sales_dataset.csv to the following Volume path: /Volumes/alldatatech_project/sales_reporting/sales/Regional_sales_dataset.csv

In [0]:
# Read CSV into DataFrame
df = spark.read.format("csv") \
  .option("header", "true") \
  .option("nullValue", "null") \
  .option("inferSchema", "true") \
  .load("/Volumes/alldatatech_project/sales_reporting/sales/Regional_sales_dataset.csv") 
display(df)
# print("Columns in DataFrame:", df.columns)

# Insert data into existing table
df.write \
    .mode("append") \
    .insertInto("alldatatech_project.sales_reporting.regional_sales_data")

In [0]:
%sql
select * from alldatatech_project.sales_reporting.regional_sales_data

In [0]:
dbutils.fs.mkdirs("/Volumes/alldatatech_project/sales_reporting/sales/reporting")