In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MinIOParquetRead") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

df = spark.read.parquet("s3a://purchased-items/")
df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, sum as spark_sum, desc, countDistinct, count
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType, TimestampType
from datetime import datetime, timedelta

# Spark session oluştur (daha önce oluşturmadıysan)
spark = SparkSession.builder \
    .appName("MinIOParquetRead") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Parquet dosyalarını oku
df = spark.read.parquet("s3a://purchased-items/")

df.printSchema()
df.show(5, truncate=False)


In [None]:
from pyspark.sql.functions import explode, sum as spark_sum

# Products array'ini explode et
products_exploded = df.select("UserId", "OrderId", explode("Products").alias("Product"))

# Ürün bazında toplam satılan adetleri hesapla
product_sales = products_exploded.groupBy("Product.ProductId") \
    .agg(spark_sum("Product.ItemCount").alias("TotalSold")) \
    .orderBy(spark_sum("Product.ItemCount").desc())

product_sales.show(10, truncate=False)


In [None]:
payment_type_counts = df.groupBy("PaymentType") \
    .count() \
    .orderBy(col("count").desc())

payment_type_counts.show()


In [None]:
from pyspark.sql.functions import expr, col, current_timestamp

recent_orders_1h = df.filter(
    (col("TimeStamp") >= (current_timestamp() - expr("INTERVAL 1 HOURS"))) &
    (col("TimeStamp") <= current_timestamp())
)

recent_orders_1h.show()

top_customers_1h = recent_orders_1h.groupBy("UserId") \
    .agg(spark_sum("TotalPrice").alias("TotalSpent")) \
    .orderBy(col("TotalSpent").desc()) \
    .limit(10)

top_customers_1h.show()


In [None]:
df.selectExpr("min(TimeStamp)", "max(TimeStamp)").show()


In [None]:
df = spark.read.parquet("s3a://purchased-items/")
df.printSchema()  # Kolonları kontrol et


In [None]:
from pyspark.sql.functions import explode, sum as spark_sum, col, countDistinct

# products_exploded zaten df üzerinden oluşturuldu varsayımıyla devam ediyorum.
# Eğer df değişkenin MinIO'dan okunan orijinal DataFrame değilse, önceki adımları kontrol et.
# products_exploded = df.select("UserId", explode("Products").alias("Product"))

# Müşteri ve ürün bazında toplam satın alma sayısı
customer_product_counts = products_exploded.groupBy("UserId", "Product.ProductId") \
    .agg(spark_sum("Product.ItemCount").alias("TotalCount"))

# Aynı ürünü birden çok kez alan müşteriler (TotalCount > 1)
multiple_products_customers = customer_product_counts.filter(col("TotalCount") > 1)

# Birden fazla farklı ürün alan müşteriler (DÜZELTİLDİ: Product.ProductId yerine ProductId)
multiple_products_customers_summary = multiple_products_customers.groupBy("UserId") \
    .agg(countDistinct("ProductId").alias("DistinctProducts")) \
    .filter(col("DistinctProducts") > 1)

multiple_products_customers_summary.show()


In [None]:
customer_product_counts.printSchema()


In [None]:
jdbc_url = "jdbc:postgresql://postgres:5432/airflow"
connection_properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

# Örnek: product_sales DataFrame'ini Postgres'e yazma (önce truncate yapman gerekir)
# Burada truncate için ayrı bir yol izlemen gerekebilir, Spark JDBC truncate desteklemez.

# Yazma
product_sales.write.jdbc(url=jdbc_url, table="product_sales", mode="overwrite", properties=connection_properties)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, desc

# SparkSession zaten varsa kullan, yoksa oluştur
spark = SparkSession.builder \
    .appName("PostgresQuery") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://postgres:5432/airflow"
connection_properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

# product_sales tablosunu oku
df = spark.read.jdbc(url=jdbc_url, table="product_sales", properties=connection_properties)

# En çok tekrar tekrar satın alınan en popüler ilk 10 ürün
result = df.groupBy("ProductId") \
    .agg(spark_sum("TotalSold").alias("TotalPurchasedCount")) \
    .orderBy(desc("TotalPurchasedCount")) \
    .limit(10)

result.show()
