In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, count, col, when, lit, unix_timestamp, to_timestamp

In [None]:
# Load the data from the CSV file into a pandas DataFrame
try:
    df_enriched_transactions = pd.read_csv('/data/ex2-df_enriched_transaction.csv')
    print("Pandas DataFrame loaded successfully.")
except FileNotFoundError:
    print("Error: The file 'ex2-df_enriched_transaction.csv' was not found.")
    df_enriched_transactions = None # Set to None if file not found to prevent further errors

Pandas DataFrame loaded successfully.


In [None]:
if df_enriched_transactions is not None:
    # Initialize SparkSession
    spark = SparkSession.builder.appName("FraudDetection").getOrCreate()
    print("SparkSession initialized successfully.")

    # Create PySpark DataFrame from Pandas DataFrame
    sdf_enriched_transactions = spark.createDataFrame(df_enriched_transactions)
    print("PySpark DataFrame created successfully.")

    # Show Schema and Data
    print("PySpark DataFrame Schema:")
    sdf_enriched_transactions.printSchema()

    print("\nFirst 5 rows of the PySpark DataFrame:")
    sdf_enriched_transactions.show(5)
else:
    print("Could not proceed with Spark operations due to file loading error.")

SparkSession initialized successfully.
PySpark DataFrame created successfully.
PySpark DataFrame Schema:
root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- transaction_hour: long (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- registration_date: string (nullable = true)
 |-- customer_tier: string (nullable = true)
 |-- last_login_date: string (nullable = true)
 |-- is_fraudulent_rule1: boolean (nullable = true)


First 5 rows of the PySpark DataFrame:
+--------------+-----------+------+-------------------+--------+------------+----------------+-------------+-------------------+-----------------+-------------+---------------+-------------------+
|transaction_id|customer_id|amount|          timestamp|currency|  ip_add

In [None]:
from pyspark.sql.functions import datediff

try:
  sdf_enriched_transactions = sdf_enriched_transactions.withColumn(
    "timestamp_to_date", to_date(col("timestamp"))).withColumn(
    "registration_date", to_date(col("registration_date"))
)
except Exception as e:
  print(f"Error converting dates: {e}")

new_customer = datediff(sdf_enriched_transactions.timestamp_to_date, sdf_enriched_transactions.registration_date) < 7
high_transaction = sdf_enriched_transactions.amount > 500

sdf_enriched_transactions = sdf_enriched_transactions.withColumn('is_fraudulent_rule1_spark', when(new_customer & high_transaction, True).otherwise(False))
sdf_enriched_transactions.show()

+--------------+-----------+------+-------------------+--------+------------+----------------+-------------+-------------------+-----------------+-------------+---------------+-------------------+-----------------+-------------------------+
|transaction_id|customer_id|amount|          timestamp|currency|  ip_address|transaction_hour|customer_name|     customer_email|registration_date|customer_tier|last_login_date|is_fraudulent_rule1|timestamp_to_date|is_fraudulent_rule1_spark|
+--------------+-----------+------+-------------------+--------+------------+----------------+-------------+-------------------+-----------------+-------------+---------------+-------------------+-----------------+-------------------------+
|         TX001|       C101|150.75|2024-07-16 10:00:00|     USD|192.168.1.10|              10|  Alice Smith|  alice@example.com|       2023-01-15|         Gold|     2024-07-15|              false|       2024-07-16|                    false|
|         TX002|       C102|  25.0|2

I tried to use UDF (lambda functions) to build the function.

```
def is_fraudulent_rule1(amount, registration_day, latest_transaction_day):
  try:
    registration_day = to_date(registration_day)
    latest_transaction_day = to_date(latest_transaction_day)
  except Exception as e:
    print(f"Error converting dates: {e}")
    return None
  
  high_transaction = amount > 500
  new_customer = datediff(to_date(registration_day), to_date(latest_transaction_day)) < 7
  return high_transaction & new_customer

sdf_enriched_transactions.withColumn('is_fraudulent_rule1', is_fraudulent_rule1(sdf_enriched_transactions.amount, sdf_enriched_transactions.registration_date, sdf_enriched_transactions.timestamp)).show(5)
```



In [None]:
ds = sdf_enriched_transactions # Use a shorter alias if preferred

# Define the base window specification: partition by customer_id, order by timestamp (as Unix timestamp for rangeBetween)
# Define the time-based window for counting transactions within 10 minutes (600 seconds)
# The frame is from -600 seconds before the current row's timestamp up to the current row (0).
windowSpec_10min = Window.partitionBy(ds.customer_id).orderBy(unix_timestamp(ds.timestamp)).rangeBetween(-10 * 60, 0)

# Define the window for getting the previous IP address and timestamp (ordered by timestamp)
# This window just needs partition and order, the frame defaults to unbounded preceding to current row
windowSpec_lag = Window.partitionBy(ds.customer_id).orderBy(ds.timestamp)

# Use lag to get the previous transaction's ip_address
ds_with_prev = ds.withColumn("prev_ip_address", lag(ds.ip_address, 1).over(windowSpec_lag))

# Use count over the 10-minute window to get the transaction count in that period
ds_with_counts = ds_with_prev.withColumn("transaction_count_10min", count("*").over(windowSpec_10min))

# Implement Rule 2: Multiple Transactions in a Short Period from Different Locations
# Logic: Flag if same customer, > 3 transactions in 10 minutes, and ip_address is different from previous.
# We need to ensure prev_ip_address is not null for the comparison.
sdf_rule2 = ds_with_counts.withColumn(
    "is_fraudulent_rule2",
    when(
        (col("transaction_count_10min") > 3) &
        (col("ip_address").isNotNull()) &
        (col("prev_ip_address").isNotNull()) &
        (col("ip_address") != col("prev_ip_address")),
        lit(True)
    ).otherwise(lit(False))
)

# Combine fraud flags from both rules (assuming 'is_fraudulent_rule1' or 'is_fraudulent_rule1_pyspark' exists from previous steps)
# Let's use 'is_fraudulent_rule1' from the original DataFrame if available, or 'is_fraudulent_rule1_pyspark_reimplemented' if you used that.
# Assuming 'is_fraudulent_rule1' is the column you want to use from the input DataFrame for the combined flag.
# If you used 'is_fraudulent_rule1_pyspark_reimplemented' from cell ZAc5HTyl_rO_, replace the column name below.
sdf_final = sdf_rule2.withColumn(
    "is_fraudulent_combined",
    col("is_fraudulent_rule1") | col("is_fraudulent_rule2")
)


# Display the relevant columns for transactions flagged by either rule
print("\nTransactions flagged by either Rule 1 or Rule 2:")
sdf_final.filter(col("is_fraudulent_combined") == True).select(
    "transaction_id",
    "customer_id",
    "timestamp",
    "ip_address",
    "is_fraudulent_rule1", # Display the original Rule 1 flag
    "is_fraudulent_rule2",
    "is_fraudulent_combined" # Display combined flag for clarity
).show(truncate=False)


Transactions flagged by either Rule 1 or Rule 2:
+--------------+-----------+-------------------+------------+-------------------+-------------------+----------------------+
|transaction_id|customer_id|timestamp          |ip_address  |is_fraudulent_rule1|is_fraudulent_rule2|is_fraudulent_combined|
+--------------+-----------+-------------------+------------+-------------------+-------------------+----------------------+
|TX011         |C101       |2024-07-16 10:10:00|192.168.1.10|false              |true               |true                  |
+--------------+-----------+-------------------+------------+-------------------+-------------------+----------------------+



In [124]:
import os

output_path = "/data/sdf_final_transaction.csv"

# Check if the file already exists
if os.path.exists(output_path):
    print(f"File already exists at {output_path}. Overwriting.")
    # In a real scenario, you might want to handle this differently,
    # e.g., append, rename, or ask the user for confirmation.

# Save the final DataFrame as a CSV file
sdf_final.write.csv(output_path, header=True, mode="overwrite")

print(f"DataFrame saved as {output_path}")

File already exists at /data/sdf_final_transaction.csv. Overwriting.
DataFrame saved as /data/sdf_final_transaction.csv


### Part 2: Conceptual Questions 🤔

#### 2.1 Why PySpark?

Moving from Pandas to PySpark for a fraud detection system, especially one handling millions of transactions per hour, is driven by the fundamental limitations of Pandas and the architectural advantages of Spark:

*   **Pandas Limitations:**
    *   **Single-Node Processing:** Pandas operates on data that fits into the memory of a single machine. For large datasets that exceed available RAM, Pandas will fail or become extremely slow.
    *   **No Built-in Distributed Processing:** Pandas is not designed for distributed computing. It cannot easily scale across multiple nodes in a cluster to process data in parallel.
    *   **Limited Fault Tolerance:** If the single machine running a Pandas script fails, the entire process stops, and you lose your progress.

*   **Spark (PySpark) Advantages:**
    *   **Distributed Processing:** Spark is designed to process data across a cluster of machines. It breaks down large datasets and computations into smaller tasks that can be executed in parallel on different nodes, enabling it to handle massive amounts of data that would be impossible with Pandas.
    *   **Fault Tolerance:** Spark is resilient to failures. If a node in the cluster fails during computation, Spark can recompute the lost partitions of data on other available nodes, ensuring the job completes without losing data.
    *   **Lazy Evaluation:** Spark uses lazy evaluation, meaning transformations (like `filter`, `select`, `withColumn`) are not executed immediately. Instead, they build a Directed Acyclic Graph (DAG) of operations. The computation only happens when an action (like `show`, `count`, `collect`) is called. This allows Spark to optimize the execution plan for efficiency.
    *   **In-Memory Computing:** Spark can perform computations in memory across the cluster, which is significantly faster than disk-based processing, although it can also spill to disk when data exceeds memory capacity.
    *   **Unified Engine:** Spark provides a unified engine for various workloads, including SQL, batch processing, streaming, machine learning, and graph processing, all with a consistent API.

For a real-time transaction fraud detection system with a high volume of transactions, PySpark's ability to handle large datasets, distribute processing, and provide fault tolerance is essential, whereas Pandas would quickly become a bottleneck.

#### 2.2 Performance Tuning in Spark

If a PySpark job is running slowly, here are two optimization techniques:

1.  **Data Partitioning:**
    *   **Description:** Data in Spark DataFrames is divided into partitions, which are logical chunks of data distributed across the nodes in the cluster. The number and size of partitions significantly impact performance. Too few partitions can lead to data skew (uneven distribution of data across nodes) and limited parallelism. Too many can lead to excessive scheduling overhead.
    *   **Improvement:** By repartitioning the DataFrame (e.g., using `repartition()` or `coalesce()`), you can control the number of partitions and ensure data is evenly distributed. This helps to minimize data skew and allows Spark to utilize all available cores effectively for parallel processing, speeding up shuffle-heavy operations like joins and aggregations. Choosing a suitable partitioning strategy (e.g., partitioning by a key used in joins) can also improve performance by reducing data movement during shuffles.

2.  **Broadcasting Small DataFrames/Lookups:**
    *   **Description:** When performing a join between a large DataFrame and a small DataFrame, Spark typically uses a shuffle hash join or sort merge join, which involves shuffling data across the network. Broadcasting is a technique where a small DataFrame is sent to all executor nodes and stored in memory.
    *   **Improvement:** Broadcasting avoids the expensive shuffle operation for the large DataFrame during a join. Instead of sending partitions of the large DataFrame to nodes where the corresponding small DataFrame data resides, the small DataFrame is readily available on all nodes. This significantly reduces network traffic and improves join performance, especially when joining a large fact table with smaller dimension tables. You can hint Spark to broadcast a DataFrame using `pyspark.sql.functions.broadcast()`.

#### 2.3 Role of Azure Databricks

Azure Databricks simplifies the work done in this exercise in several ways:

1.  **Managed Infrastructure:** Azure Databricks provides a fully managed Spark environment. You don't need to manually set up, configure, or manage Spark clusters, including setting up master and worker nodes, installing dependencies, and handling cluster scaling. Databricks handles the underlying infrastructure, allowing you to focus on writing Spark code.
2.  **Collaborative Notebooks and Environment:** Databricks offers a collaborative notebook environment similar to Colab but specifically optimized for Spark. It provides integrated support for various languages (Python, Scala, SQL, R), version control integration, and the ability to share notebooks and collaborate with team members seamlessly. The environment is pre-configured with necessary libraries and optimized Spark distributions.
3.  **Simplified Job Scheduling and Management:** While you can schedule Spark jobs manually or with external orchestrators, Databricks provides built-in job scheduling capabilities. You can easily schedule notebooks or JARs to run periodically or based on triggers, monitor job runs, and set up alerts within the Databricks platform. This simplifies the operational aspect of running production data pipelines compared to managing scheduling separately.
4.  **Integration with Azure Services:** Databricks is deeply integrated with other Azure services. This allows easy access to data stored in Azure Data Lake Storage, Azure Blob Storage, Azure SQL Database, etc., and facilitates building end-to-end data pipelines within the Azure ecosystem. Integration with services like Azure Active Directory also simplifies security and access control.