In [0]:
# Unmount the existing mount point
dbutils.fs.unmount("/mnt/transactions")

# Mounting the storage again
dbutils.fs.mount(
  source="wasbs://transcationdata@creditcardfraud.blob.core.windows.net",
  mount_point="/mnt/transactions",
  extra_configs={"fs.azure.account.key.creditcardfraud.blob.core.windows.net": "7R4IJ/A75KeXF1WhtUYp9GTlugkOHcg9BRLOkj72ee5kGZzIH0lLKPlx72bbX+y0WjAO7VZOjIFj+AStuwvZwQ=="}
)

/mnt/transactions has been unmounted.


True

In [0]:
dbutils.fs.ls("/mnt/transactions/from stream_analytics")

In [0]:
df = spark.read.json('/mnt/transactions/from stream_analytics')
df.printSchema()

root
 |-- Amount: double (nullable = true)
 |-- EventEnqueuedUtcTime: string (nullable = true)
 |-- EventProcessedUtcTime: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- PartitionId: long (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- TransactionId: long (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType, IntegerType

schema = StructType([
    StructField("TransactionId", StringType(), True),
    StructField("EventProcessedUtcTime", TimestampType(), True),
    StructField("Amount", FloatType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("EventEnqueuedUtcTime", TimestampType(), True),
    StructField("PartitionId", IntegerType(), True),
    StructField("Location", StringType(), True),
])

df = spark.read.json(
    '/mnt/transactions/from stream_analytics',
    schema=schema
)

df.show(10,truncate=False)


+-------------+--------------------------+--------+-------------------+-----------------------+-----------+---------+
|TransactionId|EventProcessedUtcTime     |Amount  |Timestamp          |EventEnqueuedUtcTime   |PartitionId|Location |
+-------------+--------------------------+--------+-------------------+-----------------------+-----------+---------+
|1756         |2024-10-06 15:18:47.759639|12695.79|2024-10-06 17:16:45|2024-10-06 15:16:46.85 |1          |LocationC|
|7423         |2024-10-06 15:18:47.808502|13235.58|2024-10-06 17:16:52|2024-10-06 15:16:54.225|1          |LocationC|
|8396         |2024-10-06 15:18:47.808589|7981.92 |2024-10-06 17:17:00|2024-10-06 15:17:01.413|1          |LocationC|
|1234         |2024-10-06 15:18:47.808626|10005.24|2024-10-06 17:17:14|2024-10-06 15:17:15.863|1          |LocationB|
|2231         |2024-10-06 15:18:47.80864 |1721.27 |2024-10-06 17:17:19|2024-10-06 15:17:20.614|1          |LocationC|
|2113         |2024-10-06 15:18:47.808674|1135.0  |2024-

In [0]:
df.describe().show()
#stats summary

+-------+------------------+-----------------+------------------+---------+
|summary|     TransactionId|           Amount|       PartitionId| Location|
+-------+------------------+-----------------+------------------+---------+
|  count|              3114|             3114|              3114|     3114|
|   mean| 5003.541425818882|7434.317845511758|2.5093127809890814|     NULL|
| stddev|2851.6665541279403|4316.426855278001|1.7065688106482713|     NULL|
|    min|                10|            10.55|                 0|LocationA|
|    max|              9997|         14998.71|                 5|LocationC|
+-------+------------------+-----------------+------------------+---------+



In [0]:
from pyspark.sql.functions import col, count, when
#counting nulls and there are null values
df.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df.columns
]).show()

+-------------+---------------------+------+---------+--------------------+-----------+--------+
|TransactionId|EventProcessedUtcTime|Amount|Timestamp|EventEnqueuedUtcTime|PartitionId|Location|
+-------------+---------------------+------+---------+--------------------+-----------+--------+
|            0|                    0|     0|        0|                   0|          0|       0|
+-------------+---------------------+------+---------+--------------------+-----------+--------+



In [0]:

# Remove duplicates
df_cleaned = df.dropDuplicates()


In [0]:
from pyspark.sql.functions import unix_timestamp, hour, col, when

# Create a new column for the hour of the transaction
df_features = df_cleaned.withColumn("TransactionHour", hour(col("Timestamp")))

# Flag high-value transactions
df_features = df_features.withColumn("HighValueFlag", when(col("Amount") > 1000, 1).otherwise(0))

display(df_features)

In [0]:
from pyspark.sql.functions import sum, avg, count

# Cast columns to appropriate numeric types
df_features = df_features.withColumn("Amount", df_features["Amount"].cast("double"))
df_features = df_features.withColumn("HighValueFlag", df_features["HighValueFlag"].cast("int"))

# Group by user or location to analyze total amounts
df_aggregated = df_features.groupBy("Location").agg(
    sum("Amount").alias("TotalAmount"),
    avg("Amount").alias("AverageAmount"),
    count("TransactionId").alias("TransactionCount"),
    sum("HighValueFlag").alias("HighValueCount")
)

display(df_aggregated)

Location,TotalAmount,AverageAmount,TransactionCount,HighValueCount
LocationB,7672438.885081291,7507.278752525725,1022,949
LocationC,7784807.618185043,7385.965482148998,1054,978
LocationA,7693219.26765728,7411.579255931869,1038,974


In [0]:
# Identifyng transactions that are significantly higher than average
avg_amount = df_features.agg(avg("Amount")).first()[0]
threshold = avg_amount * 2  # Example threshold

df_anomalies = df_features.filter(col("Amount") > threshold)
display(df_anomalies)


TransactionId,EventProcessedUtcTime,Amount,Timestamp,EventEnqueuedUtcTime,PartitionId,Location,TransactionHour,HighValueFlag
645,2024-10-06T14:47:03.556466Z,14944.7802734375,2024-10-06T16:45:33Z,2024-10-06T14:45:35.223Z,0,LocationB,16,1
559,2024-10-06T14:53:20.375064Z,14926.259765625,2024-10-06T16:52:30Z,2024-10-06T14:52:32.113Z,5,LocationA,16,1
8474,2024-10-06T15:25:06.103639Z,14897.9697265625,2024-10-06T17:24:15Z,2024-10-06T15:24:17.234Z,0,LocationC,17,1
5405,2024-10-06T14:51:15.72412Z,14892.599609375,2024-10-06T16:49:23Z,2024-10-06T14:49:25.088Z,3,LocationA,16,1
9620,2024-10-06T14:57:33.925407Z,14909.849609375,2024-10-06T16:55:46Z,2024-10-06T14:55:47.649Z,1,LocationB,16,1
6138,2024-10-06T14:49:09.225549Z,14875.330078125,2024-10-06T16:48:51Z,2024-10-06T14:48:53.293Z,4,LocationA,16,1
741,2024-10-01T17:37:17.497008Z,14998.7099609375,2024-10-01T19:35:08Z,2024-10-01T17:35:10.852Z,0,LocationB,19,1
1517,2024-10-06T15:33:17.908532Z,14993.01953125,2024-10-06T17:31:19Z,2024-10-06T15:31:21.021Z,4,LocationA,17,1
5758,2024-10-14T20:18:02.117719Z,14984.83984375,2024-10-14T22:17:22Z,2024-10-14T20:17:28.479Z,4,LocationC,22,1
3311,2024-10-14T20:23:00.50507Z,14948.2197265625,2024-10-14T22:22:35Z,2024-10-14T20:22:40.965Z,2,LocationA,22,1


In [0]:
display(df_aggregated)  # visualization


Location,TotalAmount,AverageAmount,TransactionCount,HighValueCount
LocationB,7672438.885081291,7507.278752525725,1022,949
LocationC,7784807.618185043,7385.965482148998,1054,978
LocationA,7693219.26765728,7411.579255931869,1038,974


Databricks visualization. Run in Databricks to view.

In [0]:
# Saved transformed DataFrame for further analysis
df_features.write.mode("overwrite").csv("/mnt/transactions/transformed_data")


In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Prepare data for modeling
assembler = VectorAssembler(inputCols=["Amount", "TransactionHour", "HighValueFlag"], outputCol="features")
df_model = assembler.transform(df_features)

# Split into training and testing sets
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)

# Train a logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="HighValueFlag")
model = lr.fit(train_data)

# Make predictions
predictions = model.transform(test_data)


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="HighValueFlag")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")


Model Accuracy: 1.0
