In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType

# Spark Session oluşturma
spark = SparkSession.builder \
    .appName("Parquet Reader") \
    .getOrCreate()

In [33]:
schema = StructType([
    StructField("session_id", IntegerType(), True),\
    StructField("time_stamp", StringType(), True),\
    StructField("user_id", IntegerType(), True),\
    StructField("total_price", FloatType(), True),\
    StructField("order_id", IntegerType(), True),\
    StructField("payment_type", StringType(), True),\
    StructField("products", ArrayType(StructType([
        StructField("product_id", IntegerType(), True),\
        StructField("price", FloatType(), True),\
        StructField("discount", IntegerType(), True),\
        StructField("item_count", IntegerType(), True)
    ])), True)
])

In [34]:
purchased_items_df = spark.read.option("mergeSchema", "true").schema(schema).parquet("hdfs://namenode:9000/PurchasedItems")

In [35]:
purchased_items_df = purchased_items_df.withColumn("time_stamp", from_unixtime(col("time_stamp")))

In [36]:
purchased_items_df.show()

+----------+----------+-------+-----------+--------+------------+--------+
|session_id|time_stamp|user_id|total_price|order_id|payment_type|products|
+----------+----------+-------+-----------+--------+------------+--------+
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|   NULL|       NULL|    NULL|        NULL|    NULL|
|      NULL|      NULL|  

In [38]:
most_sold_products = purchased_items_df.groupBy("products.product_id").count().orderBy("count", ascending=False).limit(10)

In [39]:
most_preferred_payment_type = purchased_items_df.groupBy("payment_type").count().orderBy("count", ascending=False).first()

In [40]:
from pyspark.sql.functions import hour
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy("user_id").orderBy(F.col("total_price").desc())
top_10_customers_last_hour = purchased_items_df.withColumn("hour", hour("time_stamp")) \
    .filter(F.col("hour") == (F.hour(F.current_timestamp()) - 1)) \
    .withColumn("rank", F.rank().over(windowSpec)) \
    .filter(F.col("rank") <= 10) \
    .select("user_id", "total_price") \
    .distinct()

In [41]:
from pyspark.sql.functions import collect_set

customers_multiple_purchases = purchased_items_df.groupBy("user_id").agg(collect_set("products.product_id").alias("purchased_products")) \
    .filter(F.size("purchased_products") > 1)

In [43]:
most_sold_products.write.format("jdbc").options(
    url='jdbc:postgresql://your_postgres_server:5432/airflow',
    driver='org.postgresql.Driver',
    dbtable='most_sold_products',
    user='airflow',
    password='airflow').mode('overwrite').save()

Py4JJavaError: An error occurred while calling o232.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:246)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:250)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [44]:
print("En çok tercih edilen ödeme tipi:")
print(most_preferred_payment_type)

En çok tercih edilen ödeme tipi:
Row(payment_type=None, count=1464)


In [45]:
print("Son 1 saatte en yüksek tutarlı siparişi veren top 10 müşteriler:")
top_10_customers_last_hour.show()

Son 1 saatte en yüksek tutarlı siparişi veren top 10 müşteriler:
+-------+-----------+
|user_id|total_price|
+-------+-----------+
+-------+-----------+



In [None]:
print("Aynı ürünü birden çok kez satın alan müşteriler ve birden çok aldıkları ürünler:")
customers_multiple_purchases.show()