## Task 1: Enterprise Data Model Design
### Key Entities and Relationships
1. **Customer**:
   - **Attributes**: CustomerID (Primary Key), Name, Email, RegistrationDate, CustomerType (e.g., regular, premium, etc.), Country, LastActiveDate.
   - **Purpose**: Captures the basic profile and segmentation information of each customer.
   - **ML Usage**:
Helps in customer segmentation for personalized recommendations.
Tracks retention rates, enabling churn prediction models.

2. **Transaction**:
   - **Attributes**: TransactionID (Primary Key), CustomerID (Foreign Key to Customer), ProductID, Date, Amount, PaymentMethod, TransactionStatus (e.g., Completed, Cancelled).
   - **Purpose**: Records every transaction a customer makes.
   - **ML Usage**:
Provides insights into purchasing behavior, e.g., average order value, purchase frequency.
Enables prediction of lifetime value (CLV) based on transaction trends.

3. **Interaction**:

   - **Attributes**: InteractionID (Primary Key), CustomerID (Foreign Key to Customer), InteractionType (e.g., view, click, add-to-cart), Timestamp, Page, Source (e.g., mobile app, website), Metadata.
   - **Purpose**: Captures granular details about how customers interact with the platform.
   - **ML Usage**: Tracks customer journeys and engagement trends.Helps in building features such as interaction-to-purchase conversion rates and session behavior patterns.

4. **Product**:
   - **Attributes**: ProductID (Primary Key), Category, Price, StockAvailability.
   - **Purpose**: Stores product information.
   - **ML Usage**:Enables recommendation models by combining product attributes with transaction and interaction data.

### Data Model Diagram + pipeline detail
![](https://dev232image.blob.core.windows.net/devimages/detailed_pipeline.png)


###ML Usecases:

**Churn Prediction**

- Combine Customer and Interaction data to track activity levels.
- Use Transaction data to observe the gap between purchases.

- ML Features:
  - DaysSinceLastPurchase: From Transaction data.
  - ActivityRate: From Interaction data (total interactions divided by the time since registration).

**Customer Segmentation**

- Utilize Customer and Transaction data to segment customers by spending patterns.
- Combine with Interaction data to determine engagement.
- ML Features:
   - AverageOrderValue: From Transaction data.
   - SessionDuration: From Interaction data.

**Personalized Recommendations**

- Use Transaction and Product data to identify frequently bought products.
- Combine with Interaction data to observe viewing habits.
- ML Features:
   - ProductAffinity: Percentage of interactions leading to purchases for each product.
   - CategoryPreferences: Most frequently purchased product categories.

**Lifetime Value (CLV) Prediction**

- Combine Customer, Transaction, and Interaction data to assess lifetime value.
- ML Features:
   - TotalSpend: Sum of all transactions.
   - InteractionToPurchaseRatio: Ratio of interactions to completed purchases.



## Task 2: Pipeline Architecture Diagram
![pipeline](https://dev232image.blob.core.windows.net/devimages/pipeline_diagram.png)

###Detailed Pipeline Workflow
![](https://dev232image.blob.core.windows.net/devimages/detailed_pipeline.png)
1. **Data Ingestion**:

   - Batch Ingestion: Utilize Databricks’ Autoloader for schema inference and efficient loading.
   Ingest files from cloud storage (S3, Azure Data Lake) into the Bronze layer.
   - Streaming Ingestion:
   Use Structured Streaming to handle real-time data (e.g., events from Kafka).

2. **Schema Enforcement and Validation**:

   - Enforce strict schemas during ingestion to avoid malformed data downstream.
   - Validate and log anomalies into a separate error table.

3. **Transformation**:

   - Use Spark SQL and Python for deduplication, null handling, and join operations in the Silver layer.
   - Incremental updates using Delta Lake's MERGE INTO functionality.

4. **Feature Engineering** :

   - Aggregate data into pre-computed features like AverageOrderValue and SessionDuration.
   - Store features in the Gold layer, ready for ML models or visualization.

5. **Optimization** :
   - Partition Gold tables by critical fields (e.g., CustomerID).
   - Use Z-Ordering to cluster data and improve query performance.


### Task 3: Real-Time Data Integration Task



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType


spark = SparkSession.builder.appName("IngestionTask").getOrCreate()

# Following code defines a sample schema as per the requirement
schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Amount", FloatType(), True),
    StructField("Timestamp", StringType(), True)
])

# Reading real-time data
raw_stream = spark.readStream \
    .format("csv") \
    .option("header", True) \
    .schema(schema) \
    .load("/path/to/streaming/data")

# Data Validation and Deduplication as the mentioned task
validated_stream = raw_stream.dropDuplicates(["TransactionID"]).na.fill({"Amount": 0})


query = validated_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start("/path/to/delta/lake/bronze")

query.awaitTermination()


### Task 4: Feature Engineering and ML Table Creation

In [0]:
from pyspark.sql.functions import avg, count, col

# Read Bronze Data
bronze_df = spark.read.format("delta").load("/path/to/delta/lake/bronze")

# Feature Calculation: Purchase Frequency and Average Order Value
features_df = bronze_df.groupBy("CustomerID") \
    .agg(
        count("TransactionID").alias("PurchaseFrequency"),
        avg("Amount").alias("AverageOrderValue")
    )

# Optimize Delta Table
features_df.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .save("/path/to/delta/lake/gold")

# Enable Delta Optimizations
spark.sql("OPTIMIZE '/path/to/delta/lake/gold' ZORDER BY (CustomerID)")
