# Azure Synapse Analytics Data Platform (ASADP)
## Data Ingestion and Exploration Notebook

This notebook demonstrates data ingestion patterns and exploratory data analysis using Azure Synapse Spark pools.

### Objectives:
1. Connect to various data sources
2. Ingest data into the data lake
3. Perform initial data exploration
4. Implement data quality checks
5. Create data profiling reports

## 1. Environment Setup and Imports

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("Libraries imported successfully!")
print(f"Spark version: {spark.version}")
print(f"Python version: {sys.version}")

## 2. Data Lake Configuration

In [None]:
# Data Lake Storage configuration
storage_account = "your_storage_account_name"
container_raw = "raw"
container_processed = "processed"
container_curated = "curated"

# Define data lake paths
raw_path = f"abfss://{container_raw}@{storage_account}.dfs.core.windows.net/"
processed_path = f"abfss://{container_processed}@{storage_account}.dfs.core.windows.net/"
curated_path = f"abfss://{container_curated}@{storage_account}.dfs.core.windows.net/"

print(f"Raw data path: {raw_path}")
print(f"Processed data path: {processed_path}")
print(f"Curated data path: {curated_path}")

## 3. Data Ingestion Functions

In [None]:
def ingest_csv_data(file_path, schema=None, header=True, delimiter=","):
    """
    Ingest CSV data with optional schema validation
    """
    try:
        if schema:
            df = spark.read.option("header", header).option("delimiter", delimiter).schema(schema).csv(file_path)
        else:
            df = spark.read.option("header", header).option("delimiter", delimiter).option("inferSchema", "true").csv(file_path)
        
        print(f"Successfully ingested {df.count()} rows from {file_path}")
        return df
    except Exception as e:
        print(f"Error ingesting data from {file_path}: {str(e)}")
        return None

def ingest_json_data(file_path, multiline=False):
    """
    Ingest JSON data
    """
    try:
        df = spark.read.option("multiline", multiline).json(file_path)
        print(f"Successfully ingested {df.count()} rows from {file_path}")
        return df
    except Exception as e:
        print(f"Error ingesting JSON data from {file_path}: {str(e)}")
        return None

def ingest_parquet_data(file_path):
    """
    Ingest Parquet data
    """
    try:
        df = spark.read.parquet(file_path)
        print(f"Successfully ingested {df.count()} rows from {file_path}")
        return df
    except Exception as e:
        print(f"Error ingesting Parquet data from {file_path}: {str(e)}")
        return None

print("Data ingestion functions defined successfully!")

## 4. Sample Data Generation

In [None]:
# Generate sample sales data for demonstration
def generate_sample_sales_data(num_records=10000):
    """
    Generate sample sales data for demonstration purposes
    """
    np.random.seed(42)
    
    # Generate data
    data = {
        'order_id': [f'ORD-{i:06d}' for i in range(1, num_records + 1)],
        'customer_id': [f'CUST-{np.random.randint(1, 1000):04d}' for _ in range(num_records)],
        'product_id': [f'PROD-{np.random.randint(1, 500):03d}' for _ in range(num_records)],
        'order_date': pd.date_range(start='2023-01-01', end='2024-12-31', periods=num_records),
        'quantity': np.random.randint(1, 10, num_records),
        'unit_price': np.round(np.random.uniform(10, 1000, num_records), 2),
        'discount': np.round(np.random.uniform(0, 0.3, num_records), 3),
        'sales_channel': np.random.choice(['Online', 'Retail', 'Wholesale', 'Partner'], num_records),
        'region': np.random.choice(['North', 'South', 'East', 'West', 'Central'], num_records),
        'category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Sports', 'Books'], num_records)
    }
    
    # Create DataFrame
    df_pandas = pd.DataFrame(data)
    
    # Calculate derived fields
    df_pandas['gross_amount'] = df_pandas['quantity'] * df_pandas['unit_price']
    df_pandas['discount_amount'] = df_pandas['gross_amount'] * df_pandas['discount']
    df_pandas['net_amount'] = df_pandas['gross_amount'] - df_pandas['discount_amount']
    
    # Convert to Spark DataFrame
    df_spark = spark.createDataFrame(df_pandas)
    
    return df_spark

# Generate sample data
sales_df = generate_sample_sales_data(50000)
print(f"Generated sample sales data with {sales_df.count()} records")
sales_df.printSchema()

## 5. Data Quality Assessment

In [None]:
def assess_data_quality(df, table_name="Unknown"):
    """
    Comprehensive data quality assessment
    """
    print(f"\n=== Data Quality Assessment for {table_name} ===")
    
    # Basic statistics
    total_rows = df.count()
    total_columns = len(df.columns)
    
    print(f"Total Rows: {total_rows:,}")
    print(f"Total Columns: {total_columns}")
    
    # Check for null values
    print("\n--- Null Value Analysis ---")
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0]
    
    for column, null_count in null_counts.asDict().items():
        null_percentage = (null_count / total_rows) * 100
        if null_count > 0:
            print(f"{column}: {null_count:,} nulls ({null_percentage:.2f}%)")
    
    # Check for duplicates
    print("\n--- Duplicate Analysis ---")
    distinct_rows = df.distinct().count()
    duplicate_rows = total_rows - distinct_rows
    duplicate_percentage = (duplicate_rows / total_rows) * 100
    
    print(f"Distinct Rows: {distinct_rows:,}")
    print(f"Duplicate Rows: {duplicate_rows:,} ({duplicate_percentage:.2f}%)")
    
    # Data type analysis
    print("\n--- Data Type Analysis ---")
    for field in df.schema.fields:
        print(f"{field.name}: {field.dataType}")
    
    return {
        'total_rows': total_rows,
        'total_columns': total_columns,
        'null_counts': null_counts.asDict(),
        'duplicate_rows': duplicate_rows,
        'distinct_rows': distinct_rows
    }

# Assess data quality
quality_report = assess_data_quality(sales_df, "Sales Data")

## 6. Exploratory Data Analysis

In [None]:
# Basic statistics
print("=== Basic Statistics ===")
sales_df.describe().show()

# Sales by channel
print("\n=== Sales by Channel ===")
channel_analysis = sales_df.groupBy("sales_channel") \
    .agg(
        count("*").alias("order_count"),
        sum("net_amount").alias("total_sales"),
        avg("net_amount").alias("avg_order_value"),
        sum("quantity").alias("total_quantity")
    ) \
    .orderBy(desc("total_sales"))

channel_analysis.show()

# Sales by region
print("\n=== Sales by Region ===")
region_analysis = sales_df.groupBy("region") \
    .agg(
        count("*").alias("order_count"),
        sum("net_amount").alias("total_sales"),
        avg("net_amount").alias("avg_order_value")
    ) \
    .orderBy(desc("total_sales"))

region_analysis.show()

# Sales by category
print("\n=== Sales by Category ===")
category_analysis = sales_df.groupBy("category") \
    .agg(
        count("*").alias("order_count"),
        sum("net_amount").alias("total_sales"),
        avg("net_amount").alias("avg_order_value")
    ) \
    .orderBy(desc("total_sales"))

category_analysis.show()

## 7. Time Series Analysis

In [None]:
# Monthly sales trend
monthly_sales = sales_df \
    .withColumn("year_month", date_format("order_date", "yyyy-MM")) \
    .groupBy("year_month") \
    .agg(
        count("*").alias("order_count"),
        sum("net_amount").alias("total_sales"),
        avg("net_amount").alias("avg_order_value")
    ) \
    .orderBy("year_month")

print("=== Monthly Sales Trend ===")
monthly_sales.show(24)

# Convert to Pandas for visualization
monthly_sales_pd = monthly_sales.toPandas()
monthly_sales_pd['year_month'] = pd.to_datetime(monthly_sales_pd['year_month'])

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Sales Analysis Dashboard', fontsize=16)

# Monthly sales trend
axes[0, 0].plot(monthly_sales_pd['year_month'], monthly_sales_pd['total_sales'], marker='o')
axes[0, 0].set_title('Monthly Sales Trend')
axes[0, 0].set_xlabel('Month')
axes[0, 0].set_ylabel('Total Sales')
axes[0, 0].tick_params(axis='x', rotation=45)

# Sales by channel
channel_data = channel_analysis.toPandas()
axes[0, 1].bar(channel_data['sales_channel'], channel_data['total_sales'])
axes[0, 1].set_title('Sales by Channel')
axes[0, 1].set_xlabel('Sales Channel')
axes[0, 1].set_ylabel('Total Sales')
axes[0, 1].tick_params(axis='x', rotation=45)

# Sales by region
region_data = region_analysis.toPandas()
axes[1, 0].pie(region_data['total_sales'], labels=region_data['region'], autopct='%1.1f%%')
axes[1, 0].set_title('Sales Distribution by Region')

# Sales by category
category_data = category_analysis.toPandas()
axes[1, 1].barh(category_data['category'], category_data['total_sales'])
axes[1, 1].set_title('Sales by Category')
axes[1, 1].set_xlabel('Total Sales')

plt.tight_layout()
plt.show()

## 8. Advanced Analytics

In [None]:
# Customer segmentation analysis
customer_metrics = sales_df.groupBy("customer_id") \
    .agg(
        count("*").alias("order_frequency"),
        sum("net_amount").alias("total_spent"),
        avg("net_amount").alias("avg_order_value"),
        max("order_date").alias("last_order_date"),
        min("order_date").alias("first_order_date")
    )

# Add recency calculation
current_date = lit(datetime.now().date())
customer_metrics = customer_metrics.withColumn(
    "recency_days", 
    datediff(current_date, col("last_order_date"))
)

print("=== Customer Metrics Summary ===")
customer_metrics.describe().show()

# Product performance analysis
product_performance = sales_df.groupBy("product_id", "category") \
    .agg(
        count("*").alias("order_count"),
        sum("quantity").alias("total_quantity"),
        sum("net_amount").alias("total_revenue"),
        avg("unit_price").alias("avg_price"),
        avg("discount").alias("avg_discount")
    ) \
    .orderBy(desc("total_revenue"))

print("\n=== Top 20 Products by Revenue ===")
product_performance.show(20)

# Seasonal analysis
seasonal_analysis = sales_df \
    .withColumn("quarter", quarter("order_date")) \
    .withColumn("month", month("order_date")) \
    .withColumn("day_of_week", dayofweek("order_date")) \
    .groupBy("quarter") \
    .agg(
        count("*").alias("order_count"),
        sum("net_amount").alias("total_sales"),
        avg("net_amount").alias("avg_order_value")
    ) \
    .orderBy("quarter")

print("\n=== Quarterly Sales Analysis ===")
seasonal_analysis.show()

## 9. Data Profiling Report

In [None]:
def generate_data_profile(df, table_name):
    """
    Generate comprehensive data profiling report
    """
    profile = {
        'table_name': table_name,
        'timestamp': datetime.now().isoformat(),
        'row_count': df.count(),
        'column_count': len(df.columns),
        'columns': {}
    }
    
    for column in df.columns:
        col_stats = df.select(column).describe().collect()
        
        column_profile = {
            'data_type': str(df.schema[column].dataType),
            'null_count': df.filter(col(column).isNull()).count(),
            'distinct_count': df.select(column).distinct().count(),
            'statistics': {row['summary']: row[column] for row in col_stats if row[column] is not None}
        }
        
        # Calculate null percentage
        column_profile['null_percentage'] = (column_profile['null_count'] / profile['row_count']) * 100
        
        # Calculate uniqueness percentage
        column_profile['uniqueness_percentage'] = (column_profile['distinct_count'] / profile['row_count']) * 100
        
        profile['columns'][column] = column_profile
    
    return profile

# Generate profile
data_profile = generate_data_profile(sales_df, "Sales Data")

print("=== Data Profiling Report ===")
print(f"Table: {data_profile['table_name']}")
print(f"Generated: {data_profile['timestamp']}")
print(f"Rows: {data_profile['row_count']:,}")
print(f"Columns: {data_profile['column_count']}")

print("\n--- Column Profiles ---")
for col_name, col_profile in data_profile['columns'].items():
    print(f"\n{col_name}:")
    print(f"  Data Type: {col_profile['data_type']}")
    print(f"  Null Count: {col_profile['null_count']} ({col_profile['null_percentage']:.2f}%)")
    print(f"  Distinct Values: {col_profile['distinct_count']} ({col_profile['uniqueness_percentage']:.2f}%)")
    
    if col_profile['statistics']:
        print(f"  Statistics: {col_profile['statistics']}")

## 10. Save Processed Data

In [None]:
# Save data to different layers of the data lake

# Save raw data (CSV format)
raw_sales_path = f"{raw_path}sales/year={datetime.now().year}/month={datetime.now().month:02d}/"
print(f"Saving raw data to: {raw_sales_path}")

sales_df.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(raw_sales_path)

# Save processed data (Parquet format with partitioning)
processed_sales_path = f"{processed_path}sales/"
print(f"Saving processed data to: {processed_sales_path}")

sales_df.withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date")) \
    .write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet(processed_sales_path)

# Save aggregated data (Delta format)
curated_sales_path = f"{curated_path}sales_summary/"
print(f"Saving curated data to: {curated_sales_path}")

# Create daily summary
daily_summary = sales_df \
    .withColumn("order_date_only", to_date("order_date")) \
    .groupBy("order_date_only", "sales_channel", "region", "category") \
    .agg(
        count("*").alias("order_count"),
        sum("quantity").alias("total_quantity"),
        sum("gross_amount").alias("gross_sales"),
        sum("discount_amount").alias("total_discounts"),
        sum("net_amount").alias("net_sales"),
        avg("net_amount").alias("avg_order_value"),
        countDistinct("customer_id").alias("unique_customers"),
        countDistinct("product_id").alias("unique_products")
    )

daily_summary.write \
    .mode("overwrite") \
    .format("delta") \
    .save(curated_sales_path)

print("\nData saved successfully to all layers!")
print(f"Raw layer: {raw_sales_path}")
print(f"Processed layer: {processed_sales_path}")
print(f"Curated layer: {curated_sales_path}")

## 11. Summary and Next Steps

In [None]:
print("=== Data Ingestion and Exploration Summary ===")
print(f"âœ… Successfully processed {sales_df.count():,} sales records")
print(f"âœ… Performed comprehensive data quality assessment")
print(f"âœ… Generated detailed data profiling report")
print(f"âœ… Conducted exploratory data analysis")
print(f"âœ… Created visualizations and insights")
print(f"âœ… Saved data to medallion architecture layers")

print("\n=== Key Insights ===")
print(f"â€¢ Total sales volume: ${sales_df.agg(sum('net_amount')).collect()[0][0]:,.2f}")
print(f"â€¢ Average order value: ${sales_df.agg(avg('net_amount')).collect()[0][0]:.2f}")
print(f"â€¢ Unique customers: {sales_df.select('customer_id').distinct().count():,}")
print(f"â€¢ Unique products: {sales_df.select('product_id').distinct().count():,}")

print("\n=== Next Steps ===")
print("1. Implement automated data quality monitoring")
print("2. Set up data lineage tracking")
print("3. Create real-time data ingestion pipelines")
print("4. Develop machine learning models for predictive analytics")
print("5. Build interactive dashboards and reports")
print("6. Implement data governance and security policies")

print("\nðŸŽ‰ Data ingestion and exploration completed successfully!")