# Pyspark

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, year
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName("MonitorAnalysis")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.11.901") \
    .config("spark.hadoop.fs.s3a.access.key", "ACCESS_KEY")\
    .config("spark.hadoop.fs.s3a.secret.key", "SECRET_KEY")\
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .config("spark.hadoop.fs.s3a.metastore.metrics.enabled", "false") \
    .config("spark.hadoop.io.native.lib.available", "false")\
    .config("spark.executor.memory", "4g")\
    .config("spark.driver.memory", "4g")\
    .config("spark.hadoop.fs.s3a.region", "ap-south-1") \
    .getOrCreate()

**OS Setup**

In [None]:
import os

os.environ['AWS_ACCESS_KEY_ID'] = 'ACCESS_KEY'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'SECRET_KEY'
os.environ['AWS_DEFAULT_REGION'] = 'ap-south-1'

**Boto Setup**

In [None]:
pip install boto3

Boto

In [None]:
import boto3

# Create a session with your credentials
session = boto3.Session(
    aws_access_key_id='ACCESS_KEY',
    aws_secret_access_key='SECRET_KEY',
    region_name='ap-south-1'  # e.g., 'us-east-1'
)

dynamodb = session.resource('dynamodb')
table = dynamodb.Table('customers')
anomaly_table = dynamodb.Table('anomaly')

# Retail Mart Monitor

In [4]:
response = table.scan()
cust_data = response['Items']

In [5]:
# Convert the DynamoDB items to DataFrame
cust_df = spark.createDataFrame(cust_data)

In [7]:
# Load data from s3
sales_input = "s3a://this-is-my-bucket007/sales_data.csv"
sales_df = spark.read.csv(sales_input, header=True, inferSchema=True)


In [8]:
print("Customer Data from DynamoDB:")
cust_df.show()
print("Transaction Data from S3:")
sales_df.show()

Customer Data from DynamoDB:
+-----------+-------------+--------------+
|customer_id|customer_name|      inactive|
+-----------+-------------+--------------+
|       C125|        manoj|{BOOL -> true}|
|       C126|          Leo|{BOOL -> true}|
|       C123|         John|{BOOL -> true}|
|       C124|        brock|{BOOL -> true}|
+-----------+-------------+--------------+

Transaction Data from S3:
+-----------+--------------+----------+-------------+---------------+
|customer_id|transaction_id|product_id|purchase_date|purchase_amount|
+-----------+--------------+----------+-------------+---------------+
|       C123|        TXN001|      P001|   2023-01-05|         150.75|
|       C124|        TXN002|      P003|   2023-01-10|          200.5|
|       C123|        TXN003|      P002|   2023-02-15|          300.0|
|       C125|        TXN004|      P004|   2023-03-10|         120.25|
|       C123|        TXN005|      P001|   2023-03-20|          450.0|
|       C124|        TXN006|      P003| 

In [9]:
from pyspark.sql.functions import avg

In [10]:
# Calculate average spending per transaction for each customer
avg_spending_df = (
    sales_df.groupBy("customer_id").agg(avg("purchase_amount").alias("avg_spending"))
)

In [11]:
# Join sales data with average spending per customer
sales_with_avg_df = sales_df.join(avg_spending_df, on="customer_id", how="left")


In [13]:
from pyspark.sql.functions import when

In [14]:
# Define a threshold (e.g., 1.5x the average spending) to flag anomalies
threshold_multiplier = 1.5
anomalies_df = sales_with_avg_df.withColumn(
    "is_anomaly",
    when(col("purchase_amount") > (col("avg_spending") * threshold_multiplier), True).otherwise(False)
).filter(col("is_anomaly") == True)

In [15]:
print("Anamolies DF:")
anomalies_df.show()

Anamolies DF:
+-----------+--------------+----------+-------------+---------------+------------+----------+
|customer_id|transaction_id|product_id|purchase_date|purchase_amount|avg_spending|is_anomaly|
+-----------+--------------+----------+-------------+---------------+------------+----------+
+-----------+--------------+----------+-------------+---------------+------------+----------+



In [16]:
# Collect anomalies and log them in DynamoDB
for row in anomalies_df.collect():
    anomaly_table.put_item(
        Item={
            "customer_id": row["customer_id"],
            "transaction_id": row["transaction_id"],
            "purchase_amount": row["purchase_amount"],
            "avg_spending": row["avg_spending"],
            "is_anomaly": row["is_anomaly"]
        }
    )

In [17]:
print("Anomalies flagged and logged in DynamoDB.")

Anomalies flagged and logged in DynamoDB.


In [None]:
spark.stop()