In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=7a37b15a95427c22ef07634e7e245e514335dbd92d7bc57e1a3d8676249509e5
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
spark=SparkSession.builder.appName('Bank').getOrCreate()

In [31]:
userSchema=StructType().add("Acc No","integer").add("Transactions","string").add("time","string").add("Amount","integer")

In [32]:
df=spark.readStream.schema(userSchema).option("MaxFilesPerTrigger",1).json("drive/My Drive/Colab Notebooks/Data/")

In [33]:
spark.conf.set("spark.sql.shuffle.partitions",1)

In [34]:
df=df.withColumn("TimeStamp",to_timestamp(col("time"))).drop("time")

In [35]:
#Getting Account wise transactions Count
account_transaction_count=df.select("Acc No").groupBy("Acc No").count()

In [37]:
account_query=account_transaction_count.writeStream.queryName("account_transactionCount_time2").format("memory").outputMode("complete").trigger(processingTime="20 seconds").start()

In [38]:
spark.sql("SELECT * FROM account_transactionCount_time2").show()

+------+-----+
|Acc No|count|
+------+-----+
|     3|   29|
|     2|   17|
|     5|   21|
|     1|   18|
|     4|   15|
+------+-----+



In [40]:
#Get account wise transaction having amount<2000
account_amount_condition_count=df.filter(col("Amount")<2000)
query=account_amount_condition_count.writeStream.queryName("Amount2").format("memory").outputMode("append").start()

In [41]:
spark.sql("SELECT * FROM Amount2").show()

+------+------------+------+--------------------+
|Acc No|Transactions|Amount|           TimeStamp|
+------+------------+------+--------------------+
|     3|           d|  1497|2023-12-06 17:27:...|
|     1|           d|   860|2023-12-06 17:27:...|
|     5|           d|   270|2023-12-06 17:27:...|
|     3|           d|   181|2023-12-06 17:27:...|
|     1|           c|  1374|2023-12-06 17:27:...|
|     1|           d|   950|2023-12-06 17:27:...|
|     3|           c|  1159|2023-12-06 17:27:...|
|     2|           c|   272|2023-12-06 17:27:...|
|     5|           c|  1897|2023-12-06 17:27:...|
|     4|           d|   446|2023-12-06 17:27:...|
|     5|           d|  1759|2023-12-06 17:27:...|
|     3|           c|  1265|2023-12-06 17:27:...|
|     5|           d|   833|2023-12-06 17:27:...|
|     3|           d|  1808|2023-12-06 17:27:...|
|     4|           d|    45|2023-12-06 17:28:...|
|     3|           c|   482|2023-12-06 17:28:...|
|     5|           c|   240|2023-12-06 17:28:...|


In [42]:
#Get account wise transaction count having amount<2000
account_amount_condition_count=df.filter(col("Amount")<2000).groupBy('Acc No').count()

In [44]:
query=account_amount_condition_count.writeStream.queryName("Amount3").format("memory").outputMode("complete").start()

In [45]:
spark.sql("SELECT * FROM Amount3").show()

+------+-----+
|Acc No|count|
+------+-----+
|     3|   40|
|     2|   46|
|     5|   49|
|     1|   34|
|     4|   47|
+------+-----+



In [50]:
from pyspark.sql.window import Window

In [55]:
#Get count of transaction with delay 10 minutes
accountDF=df.withWatermark("TimeStamp","10 minutes").groupBy(col("Acc No"),"TimeStamp").count()
query=accountDF.writeStream.queryName("timeStamp").format("csv").option("checkPointLocation","drive/My Drive/Colab Notebooks/checkpoint").option("path","drive/My Drive/Colab Notebooks/output").outputMode("append").start()

In [59]:
#Get account details with delay 10 minutes amount more than 5000
accountDF=df.withWatermark("TimeStamp","10 minutes").filter(col("Amount")>5000)
query=accountDF.writeStream.queryName("amount_time2").format("memory").outputMode("append").start()

In [60]:
spark.sql("SELECT * FROM amount_time2").show()

+------+------------+------+--------------------+
|Acc No|Transactions|Amount|           TimeStamp|
+------+------------+------+--------------------+
|     3|           d|  8229|2023-12-06 17:27:...|
|     1|           d|  9460|2023-12-06 17:27:...|
|     2|           d|  5278|2023-12-06 17:27:...|
|     5|           c|  9662|2023-12-06 17:27:...|
|     3|           c|  9050|2023-12-06 17:27:...|
|     1|           d|  9303|2023-12-06 17:27:...|
|     4|           d|  6609|2023-12-06 17:27:...|
|     2|           c|  9474|2023-12-06 17:27:...|
|     4|           d|  5065|2023-12-06 17:27:...|
|     3|           c|  8339|2023-12-06 17:27:...|
|     5|           d|  5718|2023-12-06 17:27:...|
|     1|           d|  6349|2023-12-06 17:27:...|
|     4|           c|  8567|2023-12-06 17:27:...|
|     2|           c|  6873|2023-12-06 17:27:...|
|     5|           d|  5119|2023-12-06 17:27:...|
|     4|           c|  7081|2023-12-06 17:27:...|
|     4|           d|  5365|2023-12-06 17:27:...|


In [62]:
#Process data with a delay 10 minutes and store it in the csv file
accountDF=df.withWatermark("TimeStamp","10 minutes").filter(col("Amount")>5000)
query=accountDF.writeStream.queryName("name").format("csv").option("checkPointLocation","drive/My Drive/Colab Notebooks/checkpoint").option("path","drive/My Drive/Colab Notebooks/output").outputMode("append").trigger(processingTime="10 seconds").start()

In [None]:
#To show output in console
account_query=account_transaction_count.writeStream.queryName("account_transactionCount_time2").format("console").outputMode("complete").trigger(processingTime="20 seconds").start().awaiTermination()