In [5]:
"""
–û–±—ä–µ–¥–∏–Ω–µ–Ω–∏–µ –¥–∞–Ω–Ω—ã—Ö –∏–∑ S3 –∏ Postgres –¥–ª—è –∞–Ω–∞–ª–∏–∑–∞ –∞–∫—Ç–∏–≤–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤
–°–∏–Ω—Ç–µ—Ç–∏—á–µ—Å–∫–∏–µ –¥–∞–Ω–Ω—ã–µ –¥–ª—è –¥–µ–º–æ–Ω—Å—Ç—Ä–∞—Ü–∏–∏
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, mean, max, min, sum as spark_sum, countDistinct
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, IntegerType
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# –ö–æ–Ω—Ñ–∏–≥—É—Ä–∞—Ü–∏—è
ORDERS_S3_PATH = "s3a://startde-project/roma_mesh/orders_filtered"
PG_HOST = "my-postgres-host"
PG_DATABASE = "my-database"
PG_USER = "my-username"
PG_PASSWORD = "my-password"

def generate_synthetic_data(spark): # –ì–µ–Ω–µ—Ä–∞—Ü–∏—è —Å–∏–Ω—Ç–µ—Ç–∏—á–µ—Å–∫–∏—Ö –¥–∞–Ω–Ω—ã—Ö
    
    # –°—Ö–µ–º—ã –¥–∞–Ω–Ω—ã—Ö
    orders_schema = StructType([
        StructField("order_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("order_purchase_timestamp", DateType(), True),
        StructField("order_status", StringType(), True),
        StructField("order_value", DoubleType(), True)
    ])
    
    customers_schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("order_count", IntegerType(), True),
        StructField("total_spent", DoubleType(), True),
        StructField("customer_segment", StringType(), True)
    ])
    
    # –ì–µ–Ω–µ—Ä–∞—Ü–∏—è Pandas DataFrame
    np.random.seed(42)
    n_orders = 1000
    dates = [datetime(2023, 1, 1) + timedelta(days=x) for x in range(365)]
    
    orders_pd = pd.DataFrame({
        'order_id': [f'ORD_{i:06d}' for i in range(1, n_orders + 1)],
        'customer_id': [f'CUST_{np.random.randint(1, 201):04d}' for _ in range(n_orders)],
        'order_purchase_timestamp': np.random.choice(dates, n_orders),
        'order_status': np.random.choice(['delivered', 'processing', 'shipped', 'canceled'], 
                                      n_orders, p=[0.75, 0.1, 0.1, 0.05]),
        'order_value': np.random.normal(150, 50, n_orders).clip(20, 500)
    })
    
    # –ê–∫—Ç–∏–≤–Ω—ã–µ –∫–ª–∏–µ–Ω—Ç—ã
    customer_stats = orders_pd.groupby('customer_id').agg({
        'order_id': 'count',
        'order_value': 'sum'
    }).reset_index()
    
    customer_stats['order_count'] = customer_stats['order_id'].astype(int)
    customer_stats['total_spent'] = customer_stats['order_value'].astype(float)
    
    customer_stats['customer_segment'] = pd.cut(customer_stats['order_count'], 
                                              bins=[0, 2, 5, 100], 
                                              labels=['Low', 'Medium', 'High'])
    
    customers_pd = customer_stats[['customer_id', 'order_count', 'total_spent', 'customer_segment']].copy()
    
    # –ö–æ–Ω–≤–µ—Ä—Ç–∞—Ü–∏—è –≤ Spark DataFrame
    orders_df = spark.createDataFrame(orders_pd, schema=orders_schema)
    customers_df = spark.createDataFrame(customers_pd, schema=customers_schema)
    
    return orders_df, customers_df

def main():
    # –ò–Ω–∏—Ü–∏–∞–ª–∏–∑–∞—Ü–∏—è Spark
    spark = SparkSession.builder \
        .appName("ActiveCustomersAnalysis") \
        .config("spark.hadoop.fs.s3a.access.key", "${S3_ACCESS_KEY}") \
        .config("spark.hadoop.fs.s3a.secret.key", "${S3_SECRET_KEY}") \
        .getOrCreate()
    
    # 1. –ß—Ç–µ–Ω–∏–µ –¥–∞–Ω–Ω—ã—Ö –∏–∑ S3
    print("=== –®–ê–ì 1: –ß–¢–ï–ù–ò–ï –î–ê–ù–ù–´–• –ò–ó S3 ===")
    print(f"üìÅ –ü—É—Ç—å: {ORDERS_S3_PATH}")
    
    # –í —Ä–µ–∞–ª—å–Ω–æ–º –∫–æ–¥–µ:orders_df = spark.read.parquet(ORDERS_S3_PATH)
    orders_df, active_customers_df = generate_synthetic_data(spark)
    
    # –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –∑–∞–∫–∞–∑–æ–≤
    orders_stats = orders_df.groupBy("order_status").agg(count("order_id").alias("count"))
    print("\nüìà –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –ø–æ —Å—Ç–∞—Ç—É—Å–∞–º –∑–∞–∫–∞–∑–æ–≤:")
    orders_stats.show()
    
    value_stats = orders_df.agg(
        mean("order_value").alias("avg_value"),
        max("order_value").alias("max_value"),
        min("order_value").alias("min_value")
    )
    print("üí∞ –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –ø–æ —Å—Ç–æ–∏–º–æ—Å—Ç–∏ –∑–∞–∫–∞–∑–æ–≤:")
    value_stats.show()
    
    print("\n–ü—Ä–∏–º–µ—Ä –¥–∞–Ω–Ω—ã—Ö –∑–∞–∫–∞–∑–æ–≤:")
    orders_df.show(5)
    print("‚îÄ" * 80)

    # 2. –ß—Ç–µ–Ω–∏–µ –¥–∞–Ω–Ω—ã—Ö –∏–∑ Postgres
    print("\n=== –®–ê–ì 2: –ß–¢–ï–ù–ò–ï –î–ê–ù–ù–´–• –ò–ó POSTGRES ===")
    print(f"üìä –¢–∞–±–ª–∏—Ü–∞: orders_filtered")
    print(f"üîó –ü–æ–¥–∫–ª—é—á–µ–Ω–∏–µ: {PG_HOST}:5432/{PG_DATABASE}")
    
    # –í —Ä–µ–∞–ª—å–Ω–æ–º –∫–æ–¥–µ:
    # postgres_url = f"jdbc:postgresql://{PG_HOST}:5432/{PG_DATABASE}"
    # connection_properties = {"user": PG_USER, "password": PG_PASSWORD, "driver": "org.postgresql.Driver"}
    # active_customers_df = spark.read.jdbc(url=postgres_url, table="orders_filtered", properties=connection_properties)
     
    # –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –∞–∫—Ç–∏–≤–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤
    segment_stats = active_customers_df.groupBy("customer_segment").agg(count("customer_id").alias("count"))
    print("\nüìä –°–µ–≥–º–µ–Ω—Ç–∞—Ü–∏—è –∫–ª–∏–µ–Ω—Ç–æ–≤:")
    segment_stats.show()
    
    customer_stats = active_customers_df.agg(
        mean("order_count").alias("avg_orders"),
        max("order_count").alias("max_orders"),
        mean("total_spent").alias("avg_spent")
    )
    print("üìà –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –∞–∫—Ç–∏–≤–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤:")
    customer_stats.show()
    
    print("\n–ü—Ä–∏–º–µ—Ä –¥–∞–Ω–Ω—ã—Ö –∞–∫—Ç–∏–≤–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤:")
    active_customers_df.show(5)
    print("‚îÄ" * 80)

    # 3. JOIN –æ–ø–µ—Ä–∞—Ü–∏—è
    print("\n=== –®–ê–ì 3: JOIN –û–ü–ï–†–ê–¶–ò–Ø ===")
    print("üîó –û–±—ä–µ–¥–∏–Ω–µ–Ω–∏–µ –∑–∞–∫–∞–∑–æ–≤ —Å –∞–∫—Ç–∏–≤–Ω—ã–º–∏ –∫–ª–∏–µ–Ω—Ç–∞–º–∏...")
    
    # –í—ã–ø–æ–ª–Ω—è–µ–º JOIN
    joined_df = orders_df.alias("orders").join(
        active_customers_df.alias("customers"), 
        col("orders.customer_id") == col("customers.customer_id")
    )
    
    # –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –ø–æ—Å–ª–µ JOIN
    joined_segment_stats = joined_df.groupBy("customers.customer_segment").agg(count("orders.order_id").alias("order_count"))
    print("\nüìä –†–∞—Å–ø—Ä–µ–¥–µ–ª–µ–Ω–∏–µ –∑–∞–∫–∞–∑–æ–≤ –ø–æ —Å–µ–≥–º–µ–Ω—Ç–∞–º –∫–ª–∏–µ–Ω—Ç–æ–≤:")
    joined_segment_stats.show()
    
    print("\n–ü—Ä–∏–º–µ—Ä –æ–±—ä–µ–¥–∏–Ω–µ–Ω–Ω—ã—Ö –¥–∞–Ω–Ω—ã—Ö:")
    joined_df.select(
        col("orders.order_id"), 
        col("orders.customer_id"), 
        col("orders.order_status"), 
        col("customers.order_count"), 
        col("customers.customer_segment")
    ).show(5)
    print("‚îÄ" * 80)

    # 4. –ü–æ–¥–≥–æ—Ç–æ–≤–∫–∞ —Ä–µ–∑—É–ª—å—Ç–∞—Ç–∞
    print("\n=== –®–ê–ì 4: –ü–û–î–ì–û–¢–û–í–ö–ê –†–ï–ó–£–õ–¨–¢–ê–¢–ê ===")
    
    # –§–∏–Ω–∞–ª—å–Ω—ã–π —Ä–µ–∑—É–ª—å—Ç–∞—Ç
    result_df = joined_df.select(
        col("orders.order_id"),
        to_date(col("orders.order_purchase_timestamp")).alias("order_date"),
        col("orders.customer_id"),
        col("orders.order_status"),
        col("orders.order_value"),
        col("customers.order_count"),
        col("customers.total_spent"),
        col("customers.customer_segment")
    )
    
    # –§–∏–Ω–∞–ª—å–Ω–∞—è —Å—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞
    final_stats = result_df.agg(
        count("order_id").alias("total_orders"),
        countDistinct("customer_id").alias("unique_customers"),
        spark_sum("order_value").alias("total_revenue"),
        mean("order_value").alias("avg_order_value")
    )
    
    print("\nüìä –§–ò–ù–ê–õ–¨–ù–ê–Ø –°–¢–ê–¢–ò–°–¢–ò–ö–ê:")
    final_stats.show()
    
    status_final_stats = result_df.groupBy("order_status").agg(count("order_id").alias("count"))
    print("üìà –°—Ç–∞—Ç—É—Å—ã –∑–∞–∫–∞–∑–æ–≤ –≤ —Ñ–∏–Ω–∞–ª—å–Ω–æ–º –Ω–∞–±–æ—Ä–µ:")
    status_final_stats.show()
    
    print("\n–ü—Ä–∏–º–µ—Ä —Ñ–∏–Ω–∞–ª—å–Ω—ã—Ö –¥–∞–Ω–Ω—ã—Ö:")
    result_df.show(5)
    print("‚îÄ" * 80)

    # 5. –ó–∞–ø–∏—Å—å –≤ Postgres
    print("\n=== –®–ê–ì 5: –ó–ê–ü–ò–°–¨ –†–ï–ó–£–õ–¨–¢–ê–¢–û–í –í POSTGRES ===")
    print(f"üíæ –¢–∞–±–ª–∏—Ü–∞: task_4_result_table")
    
    # –í —Ä–µ–∞–ª—å–Ω–æ–º –∫–æ–¥–µ:
    # postgres_url = f"jdbc:postgresql://{PG_HOST}:5432/{PG_DATABASE}"
    # connection_properties = {"user": PG_USER, "password": PG_PASSWORD, "driver": "org.postgresql.Driver"}
    # result_df.write.mode("overwrite").jdbc(url=postgres_url, table="task_4_result_table", properties=connection_properties)
    
    print("‚úÖ –î–∞–Ω–Ω—ã–µ —É—Å–ø–µ—à–Ω–æ –∑–∞–ø–∏—Å–∞–Ω—ã")
    print(f"üìä –ó–∞–ø–∏—Å–∞–Ω–æ –∑–∞–ø–∏—Å–µ–π: {result_df.count():,}")
    
    # –ò—Ç–æ–≥–æ–≤–∞—è —Å–≤–æ–¥–∫–∞
    print("\n" + "="*50)
    print("üéØ –ò–¢–û–ì–ò ETL-–ü–†–û–¶–ï–°–°–ê")
    print("="*50)
    
    # –°–æ–±–∏—Ä–∞–µ–º –∏—Ç–æ–≥–æ–≤—ã–µ –º–µ—Ç—Ä–∏–∫–∏
    total_orders = orders_df.count()
    active_customers_count = active_customers_df.count()
    joined_count = joined_df.count()
    final_count = result_df.count()
    total_revenue = result_df.agg(spark_sum("order_value")).collect()[0][0]
    efficiency = joined_count/total_orders*100
    
    summary_data = [
        ("–ò—Å—Ç–æ—á–Ω–∏–∫ S3", f"{total_orders:,} –∑–∞–∫–∞–∑–æ–≤"),
        ("–ê–∫—Ç–∏–≤–Ω—ã–µ –∫–ª–∏–µ–Ω—Ç—ã", f"{active_customers_count:,} –∫–ª–∏–µ–Ω—Ç–æ–≤"),
        ("–ü–æ—Å–ª–µ JOIN", f"{joined_count:,} –∑–∞–ø–∏—Å–µ–π"),
        ("–§–∏–Ω–∞–ª—å–Ω—ã–π —Ä–µ–∑—É–ª—å—Ç–∞—Ç", f"{final_count:,} —Å—Ç—Ä–æ–∫"),
        ("–û–±—â–∞—è –≤—ã—Ä—É—á–∫–∞", f"${total_revenue:,.0f}"),
        ("–≠—Ñ—Ñ–µ–∫—Ç–∏–≤–Ω–æ—Å—Ç—å", f"{efficiency:.1f}%")
    ]
    
    for metric, value in summary_data:
        print(f"‚Ä¢ {metric}: {value}")

    spark.stop()

if __name__ == "__main__":
    main()

=== –®–ê–ì 1: –ß–¢–ï–ù–ò–ï –î–ê–ù–ù–´–• –ò–ó S3 ===
üìÅ –ü—É—Ç—å: s3a://startde-project/roma_mesh/orders_filtered

üìà –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –ø–æ —Å—Ç–∞—Ç—É—Å–∞–º –∑–∞–∫–∞–∑–æ–≤:


  for column, series in pdf.iteritems():
                                                                                

+------------+-----+
|order_status|count|
+------------+-----+
|     shipped|   91|
|    canceled|   53|
|   delivered|  755|
|  processing|  101|
+------------+-----+

üí∞ –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞ –ø–æ —Å—Ç–æ–∏–º–æ—Å—Ç–∏ –∑–∞–∫–∞–∑–æ–≤:
+------------------+-----------------+---------+
|         avg_value|        max_value|min_value|
+------------------+-----------------+---------+
|151.16567101617323|306.8874266829997|     20.0|
+------------------+-----------------+---------+


–ü—Ä–∏–º–µ—Ä –¥–∞–Ω–Ω—ã—Ö –∑–∞–∫–∞–∑–æ–≤:
+----------+-----------+------------------------+------------+------------------+
|  order_id|customer_id|order_purchase_timestamp|order_status|       order_value|
+----------+-----------+------------------------+------------+------------------+
|ORD_000001|  CUST_0103|              2023-08-24|     shipped|148.33850046777695|
|ORD_000002|  CUST_0180|              2023-03-24|   delivered|  139.594157136373|
|ORD_000003|  CUST_0093|              2023-02-11|   delivered|143.