# üöÄ PipeZone - Getting Started

This notebook demonstrates how to use PipeZone to extract, transform, and load data.

## What you'll learn:
1. How to work with connections
2. How to execute flows
3. How to access data in MinIO
4. How to use Spark for transformations

## Setup

In [None]:
import sys
sys.path.insert(0, '/home/pipezone/utils')

from connection_manager import ConnectionManager
from flow_executor import FlowExecutor
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import os
print(f"Working directory: {os.getcwd()}")
print(f"Python version: {sys.version}")

## 1Ô∏è‚É£ Working with Connections

In [None]:
# Initialize connection manager
cm = ConnectionManager()

# List all available connections
connections = cm.list_connections()
print("üìã Available connections:")
for conn in connections:
    print(f"  ‚Ä¢ {conn}")

In [None]:
# Load a specific connection configuration
config = cm.load_connection_config('minio_raw')
print(f"\nüì¶ Connection: {config['name']}")
print(f"   Type: {config['type']}")
print(f"   Description: {config['description']}")
print(f"   Bucket: {config['connection']['bucket']}")

In [None]:
# Test connections
print("\nüîç Testing connections:")
for conn in connections:
    try:
        status = "‚úÖ" if cm.test_connection(conn) else "‚ùå"
        print(f"  {status} {conn}")
    except Exception as e:
        print(f"  ‚ö†Ô∏è  {conn} - {str(e)}")

## 2Ô∏è‚É£ Working with Flows

In [None]:
# List available flows
import os
from pathlib import Path

flows_path = '/opt/pipezone/metadata/flows'
flows = [f.stem for f in Path(flows_path).glob('*.yml')]

print("üìä Available flows:")
for flow in flows:
    print(f"  ‚Ä¢ {flow}")

In [None]:
# Load a flow configuration
executor = FlowExecutor()

if flows:
    flow_config = executor.load_flow_config(flows[0])
    print(f"\nüìã Flow: {flow_config['name']}")
    print(f"   Description: {flow_config['description']}")
    print(f"   Source: {flow_config['source']['connection']}")
    print(f"   Target: {flow_config['target']['connection']}")
    print(f"   Schedule: {flow_config.get('schedule', {}).get('cron', 'Not scheduled')}")

## 3Ô∏è‚É£ Using Spark

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("PipeZone Demo") \
    .master(os.getenv('SPARK_MASTER', 'local[*]')) \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"‚úÖ Spark version: {spark.version}")
print(f"   Master: {spark.sparkContext.master}")

In [None]:
# Create sample data
from datetime import datetime, timedelta
import random

# Generate sample customer data
sample_data = []
for i in range(1000):
    sample_data.append({
        'customer_id': i + 1,
        'name': f'Customer {i + 1}',
        'email': f'customer{i + 1}@example.com',
        'country': random.choice(['UZ', 'KZ', 'TJ', 'KG', 'TM']),
        'created_at': datetime.now() - timedelta(days=random.randint(0, 365)),
        'updated_at': datetime.now() - timedelta(hours=random.randint(0, 24))
    })

df = spark.createDataFrame(sample_data)
print(f"\nüìä Created sample dataset with {df.count()} records")
df.show(5)

In [None]:
# Analyze data
print("\nüìà Data Analysis:")
df.groupBy('country').count().orderBy('count', ascending=False).show()

print("\nüìä Schema:")
df.printSchema()

In [None]:
# Apply transformations
df_transformed = df \
    .withColumn('email_domain', F.split(F.col('email'), '@').getItem(1)) \
    .withColumn('year', F.year('created_at')) \
    .withColumn('month', F.month('created_at')) \
    .withColumn('processed_at', F.current_timestamp())

print("\n‚ú® Transformed data:")
df_transformed.select('customer_id', 'email', 'email_domain', 'country', 'year', 'month').show(5)

## 4Ô∏è‚É£ Working with MinIO (S3)

In [None]:
# Write data to MinIO (S3)
minio_path = "s3a://raw/demo/customers/"

df_transformed.write \
    .format('parquet') \
    .mode('overwrite') \
    .partitionBy('country', 'year', 'month') \
    .option('compression', 'snappy') \
    .save(minio_path)

print(f"\n‚úÖ Data written to MinIO: {minio_path}")

In [None]:
# Read data back from MinIO
df_read = spark.read \
    .format('parquet') \
    .load(minio_path)

print(f"\nüìñ Read {df_read.count()} records from MinIO")
df_read.show(5)

In [None]:
# Query data from MinIO
print("\nüîç Query: Customers from Uzbekistan:")
df_read.filter(F.col('country') == 'UZ') \
    .select('customer_id', 'name', 'email', 'created_at') \
    .show(10)

## üéØ Next Steps

1. **Create your own connections** in `metadata/connections/`
2. **Define data flows** in `metadata/flows/`
3. **Test flows** using `FlowExecutor`
4. **Schedule flows** in Airflow by setting `schedule.enabled: true`
5. **Monitor executions** in MySQL `pipeline_execution_logs` table

## üìö Resources

- MinIO Console: http://localhost:9001
- Airflow UI: http://localhost:8080
- Spark UI: http://localhost:8081
- Documentation: See README.md

Happy data engineering! üöÄ