# Setup

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==3.5.0
!pip install kafka-python
!pip install pymongo[srv] dnspython

Collecting pyspark==3.5.0
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.7 (from pyspark==3.5.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425346 sha256=a39be01a12ae6d9a59a39a5d418956b0d9033514fb5e7ce4925bf2e6421243d4
  Stored in directory: /root/.cache/pip/wheels/84/40/20/65eefe766118e0a8f8e385cc3ed6e9eb7241c7e51cfc04c51a
Successfully built pyspark
Installing collected packages: py4j, pyspark
  

In [2]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell"

In [3]:
from pyspark.sql import SparkSession
session=SparkSession.builder.appName("Kafka Projesi").getOrCreate()

In [4]:
!openssl pkcs12 -export \
    -inkey service.key \
    -in service.cert \
    -out client.keystore.p12 \
    -name service_key \
    -passout pass:123456

# Kafka Bağlantısı

In [5]:
KEYSTORE_PATH = "client.keystore.p12"
KEYSTORE_PASSWORD = 123456


CA_PEM_PATH = "ca.pem"
BOOTSTRAP_SERVERS = "kafka-server-address:port"

try:
    rawDf = session.read.format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
        .option("subscribe", "ecommerce-data") \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.type", "PEM") \
        .option("kafka.ssl.truststore.location", CA_PEM_PATH) \
        .option("kafka.ssl.keystore.type", "PKCS12") \
        .option("kafka.ssl.keystore.location", KEYSTORE_PATH) \
        .option("kafka.ssl.keystore.password", KEYSTORE_PASSWORD) \
        .load()

    rawDf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(truncate=False)

except Exception as e:
    print(f"HATA: {e}")

+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key |value                                                                                                                                                                                                         |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NULL|{"OrderId":"33c87164-f9ac-45e5-ba3c-c701c1ddff3c","CustomerName":"Narciso Cartwright","Product":"Ergonomic Soft Pizza","Price":988.18,"City":"Port Marilynechester","OrderDate":"2025-12-25T07:58:00.186843Z"}|
|NULL|{"OrderId":"11ad579a-0602-4abc-8a57-1b7b7c0f8b10","CustomerName":"Clement Greenholt","Product":"Incredible Rubber Ball","Price":670.82,"Ci

In [6]:
rawDf.show(5)

+----+--------------------+--------------+---------+------+--------------------+-------------+
| key|               value|         topic|partition|offset|           timestamp|timestampType|
+----+--------------------+--------------+---------+------+--------------------+-------------+
|NULL|[7B 22 4F 72 64 6...|ecommerce-data|        0|     0|2025-12-25 07:58:...|            0|
|NULL|[7B 22 4F 72 64 6...|ecommerce-data|        0|     1|2025-12-25 07:58:...|            0|
|NULL|[7B 22 4F 72 64 6...|ecommerce-data|        0|     2|2025-12-25 07:58:...|            0|
|NULL|[7B 22 4F 72 64 6...|ecommerce-data|        0|     3|2025-12-25 07:58:...|            0|
|NULL|[7B 22 4F 72 64 6...|ecommerce-data|        0|     4|2025-12-25 07:58:...|            0|
+----+--------------------+--------------+---------+------+--------------------+-------------+
only showing top 5 rows



In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema=StructType()\
.add("OrderId",StringType())\
.add("CustomerName",StringType())\
.add("Product",StringType())\
.add("Price",DoubleType())\
.add("City",StringType())\
.add("OrderDate",TimestampType())

In [8]:
#hexadecimal gelen veriyi stringe çevirme
orderDf = rawDf.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")
orderDf.show()

+--------------------+--------------------+--------------------+------+--------------------+--------------------+
|             OrderId|        CustomerName|             Product| Price|                City|           OrderDate|
+--------------------+--------------------+--------------------+------+--------------------+--------------------+
|33c87164-f9ac-45e...|  Narciso Cartwright|Ergonomic Soft Pizza|988.18|Port Marilynechester|2025-12-25 07:58:...|
|11ad579a-0602-4ab...|   Clement Greenholt|Incredible Rubber...|670.82|         Laurineberg|2025-12-25 07:58:...|
|71dd4aaa-444a-445...|          Mina Bayer| Gorgeous Wooden Car|786.41|         Ritchiebury|2025-12-25 07:58:...|
|377d91eb-6ff6-40f...|       Modesto Lakin|Awesome Cotton Pizza|345.32|         Monahantown|2025-12-25 07:58:...|
|932a4364-8d7c-4ec...|    Adrienne Friesen|Refined Granite G...|875.59|           East Bart|2025-12-25 07:58:...|
|4787f049-3b99-4ec...|      Romaine Paucek|  Small Wooden Shirt|841.79|   Lake Carolynef

In [9]:
orderDf.count()

4648

# Batch veri analizi

###En pahalı alışverişler

In [10]:
orderDf.orderBy("Price",ascending=False).show()

+--------------------+-----------------+--------------------+----------+-------------+--------------------+
|             OrderId|     CustomerName|             Product|     Price|         City|           OrderDate|
+--------------------+-----------------+--------------------+----------+-------------+--------------------+
|a0ad66b0-225a-4a7...|    Eliane Stokes| Awesome Metal Chair|9793419.26|      Ardahan|2025-12-27 06:59:...|
|8baeab19-9519-409...|    Pablo Flatley|Licensed Concrete...|9251785.17|      Tunceli|2025-12-27 06:59:...|
|1b499b20-4043-46d...|    Nestor Hansen|Sleek Frozen Chicken|9161292.33|      Giresun|2025-12-27 06:59:...|
|11e1e76b-1ad7-49d...|      Flo Ullrich|  Tasty Concrete Hat|9151492.85|        Kilis|2025-12-27 06:59:...|
|5e698fe2-320f-497...|    Kavon Volkman|Practical Frozen ...|9127437.38|        Nigde|2025-12-27 06:59:...|
|811c3d56-3703-4f3...|    Demarcus Kihn|Tasty Plastic Sau...|8290665.17|        Kilis|2025-12-27 06:59:...|
|6e685014-4db7-4a9...|   Bra

##En çok satan ürünler

In [11]:
orderDf.groupBy("Product").count().orderBy("count",ascending=False).show()

+--------------------+-----+
|             Product|count|
+--------------------+-----+
|Generic Steel Sau...|    6|
|Awesome Plastic T...|    6|
|Rustic Soft Sausages|    5|
|   Tasty Fresh Shoes|    5|
|Incredible Cotton...|    5|
|   Tasty Rubber Ball|    5|
|    Sleek Soft Mouse|    5|
| Refined Steel Chips|    5|
|Intelligent Cotto...|    5|
| Rustic Frozen Chair|    5|
|Handcrafted Rubbe...|    5|
|Unbranded Cotton ...|    5|
|Refined Steel Gloves|    5|
|Handcrafted Steel...|    5|
| Unbranded Fresh Car|    4|
|Fantastic Concret...|    4|
| Handmade Soft Chips|    4|
|Fantastic Frozen ...|    4|
|Awesome Wooden Bacon|    4|
|Ergonomic Concret...|    4|
+--------------------+-----+
only showing top 20 rows



##Son 1 saatteki alışverişlerin listesi

In [12]:

last_hour_df = orderDf.filter(col("OrderDate") > current_timestamp() - expr("INTERVAL 1 HOUR"))

last_hour_df.show(truncate=False)

+------------------------------------+----------------------+--------------------------+--------+---------+--------------------------+
|OrderId                             |CustomerName          |Product                   |Price   |City     |OrderDate                 |
+------------------------------------+----------------------+--------------------------+--------+---------+--------------------------+
|1b2db324-f505-47b1-b56b-9642cae628a1|Glenda Howell         |Refined Soft Tuna         |66305.96|Aydin    |2025-12-28 17:09:19.716925|
|18cf1e66-b8a4-4e84-96e4-7624cbf95e58|Rosalee Langworth     |Practical Concrete Towels |24984.58|Kocaeli  |2025-12-28 17:09:22.02304 |
|fd40f536-dc28-49a4-a2cf-d2ac8be2f10d|Trey Howell           |Handmade Concrete Fish    |30675.35|Mardin   |2025-12-28 17:09:23.122202|
|f5885330-997c-4039-bf64-146c1a2b4a35|Meda Roberts          |Intelligent Frozen Pizza  |6884.01 |Artvin   |2025-12-28 17:09:24.218469|
|58f787ab-08e4-4c2a-80fc-06e319bec139|Providenci Ruther

 ## Son 1 saatteki alışverişlerin sayısı

In [13]:
last_hour_df.count()

229

# Stream veri için Kafka bağlantısı

In [14]:

rawDf = session.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("subscribe", "ecommerce-data") \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.type", "PEM") \
    .option("kafka.ssl.truststore.location", CA_PEM_PATH) \
    .option("kafka.ssl.keystore.type", "PKCS12") \
    .option("kafka.ssl.keystore.location", KEYSTORE_PATH) \
    .option("kafka.ssl.keystore.password", KEYSTORE_PASSWORD) \
    .option("startingOffsets", "latest") \
    .load()


df_parsed = rawDf.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")


query = df_parsed.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("orders") \
    .start()


In [28]:

df = session.table("orders")

df.show(5, truncate=False)

+------------------------------------+-----------------+---------------------+--------+---------+--------------------------+
|OrderId                             |CustomerName     |Product              |Price   |City     |OrderDate                 |
+------------------------------------+-----------------+---------------------+--------+---------+--------------------------+
|56aa7fef-b8ea-47d7-bd90-0b96bffbf748|Alycia Bartoletti|Handmade Frozen Shirt|31431.76|Usak     |2025-12-28 17:13:47.960944|
|6c08b7d2-cbb3-4f20-9be3-fccc3d62084a|Cindy Conroy     |Gorgeous Steel Tuna  |25946.59|Karaman  |2025-12-28 17:13:49.063792|
|25de2cf8-9c74-4b7c-9f9d-f1ebb3508451|Arno Senger      |Small Wooden Car     |20027.87|Ordu     |2025-12-28 17:13:50.142414|
|a3a96cdf-0f1f-4584-8a39-d53d6f9d410a|Deon Little      |Practical Cotton Bike|63293.49|Rize     |2025-12-28 17:13:51.230784|
|21d3822a-fade-4831-9cad-097bf3cc770b|Raheem Daniel    |Small Plastic Bacon  |82309.37|Gaziantep|2025-12-28 17:13:52.328318|


# Stream veri analizi

## Stream veri sayısı

In [40]:
df.count()

306

## Steam verilere göre anlık olarak yapılan en pahalı alışverişler

In [41]:
df.orderBy("Price",ascending=False).show()

+--------------------+-----------------+--------------------+--------+---------+--------------------+
|             OrderId|     CustomerName|             Product|   Price|     City|           OrderDate|
+--------------------+-----------------+--------------------+--------+---------+--------------------+
|eb20ebf9-d306-426...|       Zane Upton|Ergonomic Rubber ...|99681.12|Kirikkale|2025-12-28 17:15:...|
|8a4a0b35-2f9c-45a...|  Reina Kertzmann|Ergonomic Metal G...|99238.16|      Mus|2025-12-28 17:17:...|
|9fdac3ce-bd3b-497...|    Rebeka Carter|Small Plastic Chi...|98773.49|    Hatay|2025-12-28 17:14:...|
|8c6bc2ce-6403-446...| Abraham Baumbach|Generic Cotton Ch...|97973.23| Adiyaman|2025-12-28 17:15:...|
|96f8005e-9724-42e...|   Randal Ziemann|Sleek Cotton Keyb...|97595.11|   Samsun|2025-12-28 17:14:...|
|88f07b6c-b937-4ef...|      Benton Lind|Rustic Rubber Chi...|97417.89|   Mersin|2025-12-28 17:18:...|
|dce74be7-4291-489...| Brendan Nitzsche|Small Granite Gloves|96822.89| Tekirdag|20

## Stream verilere göre anlık olarak en çok satılan ürünler

In [42]:
df.groupBy("Product").count().orderBy("count",ascending=False).show()

+--------------------+-----+
|             Product|count|
+--------------------+-----+
|   Generic Steel Hat|    2|
| Generic Steel Chair|    2|
|Intelligent Froze...|    2|
|Small Granite Gloves|    2|
|Unbranded Frozen ...|    2|
|Fantastic Plastic...|    2|
|Licensed Metal Mouse|    2|
| Sleek Plastic Chips|    2|
|    Rustic Metal Hat|    2|
|Incredible Metal ...|    2|
|Ergonomic Metal Tuna|    2|
|Handmade Cotton S...|    1|
|    Sleek Frozen Car|    1|
| Small Wooden Cheese|    1|
|Rustic Cotton Cheese|    1|
|Awesome Concrete ...|    1|
|Incredible Wooden...|    1|
|Handmade Steel Sa...|    1|
|  Sleek Rubber Bacon|    1|
|Unbranded Rubber ...|    1|
+--------------------+-----+
only showing top 20 rows



## Stream verilere göre anlık olarak en çok alışveriş yapılan şehirler

In [43]:
df.groupBy("City").count().orderBy("count",ascending=False).show()

+----------+-----+
|      City|count|
+----------+-----+
|Kirklareli|    8|
|  Nevsehir|    8|
|     Sivas|    7|
|     Tokat|    7|
|    Elazig|    7|
|    Samsun|    7|
|   Isparta|    6|
|    Edirne|    6|
|   Kutahya|    6|
|       Mus|    6|
| Canakkale|    6|
| Gaziantep|    6|
|   Kayseri|    5|
| Balikesir|    5|
|     Igdir|    5|
|     Hatay|    5|
|     Corum|    5|
|    Yalova|    5|
|  Kirsehir|    5|
|     Siirt|    5|
+----------+-----+
only showing top 20 rows



## Stream verilere göre anlık olarak en çok harcama yapan şehirler

In [44]:
df.groupBy("City").sum("Price").orderBy("sum(Price)",ascending=False).show()

+----------+------------------+
|      City|        sum(Price)|
+----------+------------------+
|Kirklareli|         465504.05|
|  Nevsehir|          452996.2|
|     Tokat|         450592.76|
|    Samsun|         427785.12|
| Canakkale|370323.45000000007|
|    Elazig|         334358.61|
|      Bolu|         312222.61|
| Kirikkale|         310584.61|
|    Edirne|         296165.99|
|  Adiyaman|290222.44999999995|
|   Isparta|289148.91000000003|
|      Agri|288928.43000000005|
|    Artvin|         284223.06|
|     Izmir|280719.29000000004|
|   Kayseri|         273621.34|
|Diyarbakir|         269126.81|
| Gumushane|         268453.11|
| Gaziantep|         266463.95|
|    Bartin|261372.09999999998|
|     Bursa|         254600.68|
+----------+------------------+
only showing top 20 rows



## Stream verilere göre anlık olarak en çok alışveriş yapan müşteriler

In [45]:
df.groupBy("CustomerName").count().orderBy("count",ascending=False).show()

+-----------------+-----+
|     CustomerName|count|
+-----------------+-----+
|     Paolo Maggio|    1|
|     Ernest Ortiz|    1|
|   Vincent Waters|    1|
|     Robin Heaney|    1|
|   Amie O'Connell|    1|
|    Makayla Bauch|    1|
|Jerrold O'Connell|    1|
|   Karlie Lebsack|    1|
|        Lily Veum|    1|
|     Anne Bradtke|    1|
|       Flo Deckow|    1|
|     Theresa Rowe|    1|
|   Elian Franecki|    1|
| Maryjane Kilback|    1|
|  Bradley Spencer|    1|
|      Keyon Braun|    1|
| Jackeline Herzog|    1|
|       Tony Purdy|    1|
|        Kyle Moen|    1|
|        Ivy Bauch|    1|
+-----------------+-----+
only showing top 20 rows



# Şüpheli işlemlerin tespiti

In [46]:
import pymongo

MONGO_URI ="mongodb+srv:uri"

In [47]:
stats_df = session.sql("""
    SELECT
        City,
        AVG(Price) as RefMean,
        STDDEV(Price) as RefStdDev
    FROM orders
    GROUP BY City
""")
stats_df.cache()
stats_df.count()

0

## Şüpheli işlemlerin MongoDB'ye kaydedilmesi

In [48]:
def write_anomalies_to_mongo(batch_df, epoch_id):
    if batch_df.count() == 0:
        return

    batch_df.cache()

    checked_df = batch_df.join(stats_df, "City", "left")

    anomalies = checked_df.filter(
        col("Price") > (col("RefMean") + (2 * col("RefStdDev")))
    )

    count = anomalies.count()
    if count > 0:

        try:
            data_to_save = [row.asDict() for row in anomalies.collect()]


            client = pymongo.MongoClient(MONGO_URI)
            db = client["EcommerceDB"]
            collection = db["FraudAlerts"]


            collection.insert_many(data_to_save)

            client.close()

        except Exception as e:
            print(f"Hata: {e}")

    batch_df.unpersist()



In [49]:

query_mongo = df_parsed.writeStream \
    .outputMode("append") \
    .foreachBatch(write_anomalies_to_mongo) \
    .start()

## MongoDB'ye olan stream işleminin durdurulması

In [26]:
query_mongo.stop()