In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,FloatType
from pyspark.sql.functions import col,when
import os
from pyspark.sql import SparkSession
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY")
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
BUCKET_NAME = os.getenv("BUCKET_NAME")
POSTGRES_USER=os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD=os.getenv("POSTGRES_PASSWORD")
POSTGRES_ENDPOINT=os.getenv("POSTGRES_ENDPOINT")
DATA_ENPOINT=os.getenv("DATA_ENPOINT")
spark = SparkSession.builder \
    .appName("MinIOReader") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.jars", "/home/harshithts/jars/postgresql-42.7.3.jar") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .getOrCreate()

spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")



25/11/26 19:50:00 WARN Utils: Your hostname, harshithts-HP-Pavilion-Gaming-Laptop-15-ec2xxx resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlo1)
25/11/26 19:50:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/harshithts/RealTimeData-Pipeline/env/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/harshithts/.ivy2/cache
The jars for the packages stored in: /home/harshithts/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cae12a16-9b26-434a-b931-6570df4d2a1e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 194ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	-------------------------

In [None]:

transaction_schema = StructType([
    StructField("txn_id", StringType()),
    StructField("user_id", StringType()),
    StructField("amount", FloatType()),
    StructField("merchant_id", StringType()),
    StructField("channel", StringType()),
    StructField("location", StringType()),
    StructField("timestamp", StringType()),
])

transaction_df = spark.readStream \
    .format("json") \
    .schema(transaction_schema) \
    .load(f"s3a://{BUCKET_NAME}/bronze/transactions-folder/")


#this is rules for fraud

SuspiciousMerchant = ["M0008", "M0009"]

fraud_df = transaction_df.withColumn(
    "frauds",
    when((col("amount") > 30000) & (col("channel") == "Online"), "HighOnlineAmount")
    .when(col("merchant_id").isin(SuspiciousMerchant), "SuspiciousMerchant")
    .when((col("channel") == "Online") & 
          (col("location").isin("Port Shannon", "New Pamela")), "RiskLocation")
    .when(col("amount") > 30000, "HugeAmount")
    .otherwise("Normal")
)

fraud_illegal = fraud_df.filter(col("frauds") != "Normal")  # only fraud
genuine_df = fraud_df.filter(col("frauds") == "Normal")     # normal ones



#       Writing to DB
def writeFraud(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", DATA_ENPOINT) \
        .option("dbtable", "public.FraudTransaction") \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()


def writeGenuine(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", DATA_ENPOINT) \
        .option("dbtable", "public.GenuineTransaction") \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()


25/11/26 19:50:06 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [None]:
# fraud → postgres
fraud_query = fraud_illegal.writeStream \
    .foreachBatch(writeFraud) \
    .outputMode("append") \
    .option("checkpointLocation", f"s3a://{BUCKET_NAME}/checkpoint/fraud_write") \
    .start()

# Genuine → postgres
genuine_query = genuine_df.writeStream \
    .foreachBatch(writeGenuine) \
    .outputMode("append") \
    .option("checkpointLocation", f"s3a://{BUCKET_NAME}/checkpoint/genuine_write") \
    .start()

# fraud → console
console_query = fraud_illegal.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", f"s3a://{BUCKET_NAME}/checkpoint/fraud_console") \
    .start()

spark.streams.awaitAnyTermination()


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------


[Stage 4:>                (0 + 12) / 12][Stage 5:>                 (0 + 0) / 12]

+--------------+-------+--------+-----------+-------+-------------------+-------------------+------------------+
|        txn_id|user_id|  amount|merchant_id|channel|           location|          timestamp|            frauds|
+--------------+-------+--------+-----------+-------+-------------------+-------------------+------------------+
|T1764166756618|  U0002|35084.64|      M0004|    ATM|South Meghanborough|2025-11-26 19:49:16|        HugeAmount|
|T1764166779675|  U0009|38379.22|      M0007|    POS|   Robertsonborough|2025-11-26 19:49:39|        HugeAmount|
|T1764166752608|  U0002| 10525.8|      M0008|    POS|   West Alisonmouth|2025-11-26 19:49:12|SuspiciousMerchant|
|T1764166761630|  U0003|14767.65|      M0009|    POS|    West Racheltown|2025-11-26 19:49:21|SuspiciousMerchant|
|T1764166770654|  U0009|34780.93|      M0010|    POS|    South Hollyside|2025-11-26 19:49:30|        HugeAmount|
|T1764166803740|  U0010|31723.24|      M0003|    ATM|    South Bryanbury|2025-11-26 19:50:03|   

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------+-------+--------+-----------+-------+----------+-------------------+------------------+
|        txn_id|user_id|  amount|merchant_id|channel|  location|          timestamp|            frauds|
+--------------+-------+--------+-----------+-------+----------+-------------------+------------------+
|T1764166811766|  U0010|46659.13|      M0010| Online|Port Alice|2025-11-26 19:50:11|  HighOnlineAmount|
|T1764166808756|  U0007|  1361.9|      M0009| Online| West Troy|2025-11-26 19:50:08|SuspiciousMerchant|
+--------------+-------+--------+-----------+-------+----------+-------------------+------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------+-------+--------+-----------+-------+----------------+-------------------+------------------+
|        txn_id|user_id|  amount|merchant_id|channel|        location| 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/harshithts/RealTimeData-Pipeline/env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/harshithts/RealTimeData-Pipeline/env/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 