In [1]:
#import the SparkSession from pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

In [2]:
#start the spark session
spark = SparkSession.builder \
    .appName("SalesDataStreaming") \
    .getOrCreate()

2024-09-07 11:20:15,310 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-09-07 11:20:16,128 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2024-09-07 11:20:16,128 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2024-09-07 11:20:16,128 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
2024-09-07 11:20:16,773 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
from sqlalchemy.engine import create_engine
import pandas as pd

host='localhost' #Since hive is running localy
port=10000 # Default port for HiveServer2
username='maneelcha49dgre'
database='fireBase'

connection_string=f'hive://{username}@{host}:{port}/{database}'

engine=create_engine(connection_string)

### 1. Schema Definition and Data Loading (5 Marks):
##### ● Task: Set up the schema for each dataset using the provided structure.Load the datasets from their respective locations using file-structured streaming.
##### ● Expected Result: Accurate schema definition and successful data loadinginto data frames without errors.

In [4]:
import os 
os.environ['HADOOP_HOME'] = '/opt/hadoop' 
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['HADOOP_HOME'], 'bin') 
!hadoop version

Hadoop 3.3.0
Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r aa96f1871bfd858f9bac59cf2a81ec470da649af
Compiled by brahma on 2020-07-06T18:44Z
Compiled with protoc 3.7.1
From source with checksum 5dc29b802d6ccd77b262ef9d04d19c4
This command was run using /opt/hadoop/share/hadoop/common/hadoop-common-3.3.0.jar


In [5]:
#put all csv into list
file_list = ['myfiles/transaction_data.csv','myfiles/merchant_data.csv', 'myfiles/account_data.csv', 'myfiles/customer_data.csv', 'myfiles/branch_data.csv.csv']
loading csv files from local to hdfs
#load csv file from local to hdfs
for file in file_list:
    os.system(f'hdfs dfs -put {file} /user/maneelcha49dgre/review_file1/' )
print("successfully loaded")

In [6]:
import pandas as pd
schema = [
    """create table if not exists transaction_data(
    Transaction_ID string,
    Account_ID string,
    merchant_id string,
    Transaction_Date date,
    Processing_Date date,
    Amount double
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ("skip.header.line.count" = "1")
    """,
    """create table if not exists merchant_data(
    Merchant_ID string,
    Merchant_Name string ,
    Merchant_Category string, 
    Country string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ("skip.header.line.count" = "1")
    """,
    """create table if not exists account_data(
    Account_ID string,
    Customer_ID string,
    Branch_ID string,
    Account_Type string,
    Balance double
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ("skip.header.line.count" = "1")
    """,
    """ create table if not exists customer_data(
    Customer_ID string,
    Customer_Name string,
    Customer_Type string,
    Customer_Country string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ("skip.header.line.count" = "1")
    """,
    """create table if not exists branch_data(
    Branch_ID string,
    Branch_Name string,
    Location string,
    Manager string
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ("skip.header.line.count" = "1")
    """
]

for tables in schema:
    with engine.connect() as connection:
        connection.execute(tables)

In [7]:
#List of all the tables created
with engine.connect() as connection:
    res=connection.execute("show tables")
for x in res:
    print(x)

In [8]:
table_list=["transaction_data","merchant_data","account_data","customer_data","branch_data"]
dataset_list=["transaction_data.csv","merchant_data.csv","account_data.csv","customer_data.csv","branch_data.csv"]

for i in range(len(table_list)):
    load_file=f"load data inpath '/user/maneelcha49dgre/review_file1/{dataset_list[i]}' into table {table_list[i]}"
    with engine.connect() as connection:
        connection.execute(load_file)            
print('Successfully loaded')



In [9]:
#store all the tables record into pandas dataframe
trans_df=pd.read_sql("select * from transaction_data",engine)
merch_df=pd.read_sql("select * from merchant_data",engine)
acc_df=pd.read_sql("select * from account_data",engine)
cust_df=pd.read_sql("select * from customer_data",engine)
branch_df=pd.read_sql("select * from branch_data",engine)

print("Successfully created dataframe")

Successfully created dataframe


In [10]:
spark_trans_df=spark.createDataFrame(trans_df)
spark_merch_df=spark.createDataFrame(merch_df)
spark_acc_df=spark.createDataFrame(acc_df)
spark_cust_df=spark.createDataFrame(cust_df)
spark_branch_df=spark.createDataFrame(branch_df)

print("Successfully created spark dataframe")

Successfully created spark dataframe


### 2. DataFrame Merging (5 Marks):
● Task: Join all the data frames (account_data, transaction_data,customer_data, merchant_data, branch_data) into a single data frame called Full_DataFrame. Ensure the resulting data frame does not contain duplicate columns.

● Expected Result: Correct and efficient merging, with no duplicate columns

In [11]:
# join all the dataframes into one
merged_data=spark_acc_df.join(spark_cust_df,on=['Customer_ID'],how='left')
merged_data=merged_data.join(spark_branch_df,on=['Branch_Id'],how='left')
merged_data=spark_trans_df.join(merged_data,on=['Account_ID'],how='right')
merged_data=merged_data.join(spark_merch_df,on=['Merchant_ID'],how='left')
print(merged_data.show())
merged_data.count()

                                                                                

+-----------+----------+--------------+----------------+---------------+------+---------+-----------+------------+-------+-------------+-------------+----------------+-----------+--------+---------+-------------+-----------------+-------+
|merchant_id|account_id|transaction_id|transaction_date|processing_date|amount|branch_id|customer_id|account_type|balance|customer_name|customer_type|customer_country|branch_name|location|  manager|merchant_name|merchant_category|country|
+-----------+----------+--------------+----------------+---------------+------+---------+-----------+------------+-------+-------------+-------------+----------------+-----------+--------+---------+-------------+-----------------+-------+
|     MER006|    ACC010|        TXN006|      2024-08-05|     2024-07-29|252.03|   BID003|    CUST010|    Checking|3340.17|  Customer_10|   Individual|          Canada|   Branch_3| Toronto|Manager_3|   Merchant_6|    Entertainment|    USA|
|     MER015|    ACC012|        TXN015|     

                                                                                

22

### 3. Calculating Transaction Anomalies (5 Marks):
● Task: Identify transactions where the processing time exceeds 7 days and flag them as anomalies by creating a new column Anomaly.

● Expected Result: A new column Anomaly where transactions with processing time > 7 days are marked.

In [12]:
query=""" select * ,datediff(processing_date,transaction_date) as processing_time,
case
  when datediff(processing_date,transaction_date)>7 then 'YES'
  else 'NO'
end as Anomaly
from transaction_data"""
pd.read_sql(query,engine)

Unnamed: 0,transaction_id,account_id,merchant_id,transaction_date,processing_date,amount,processing_time,anomaly
0,TXN001,ACC012,MER001,2024-07-02,2024-07-20,354.28,18,YES
1,TXN002,ACC004,MER002,2024-09-03,2024-07-12,499.71,-53,NO
2,TXN003,ACC014,MER003,2024-08-29,2024-06-11,168.53,-79,NO
3,TXN004,ACC014,MER004,2024-08-10,2024-06-09,132.45,-62,NO
4,TXN005,ACC006,MER005,2024-06-13,2024-08-16,374.18,64,YES
5,TXN006,ACC010,MER006,2024-08-05,2024-07-29,252.03,-7,NO
6,TXN007,ACC014,MER007,2024-06-22,2024-09-02,288.11,72,YES
7,TXN008,ACC006,MER008,2024-07-04,2024-07-21,455.89,17,YES
8,TXN009,ACC005,MER009,2024-08-10,2024-08-25,375.36,15,YES
9,TXN010,ACC015,MER010,2024-07-01,2024-07-13,270.77,12,YES


### 4. Branch-Level Monthly Profit Trends (5 Marks):
● Task: For each branch, calculate the monthly total transaction amount and identify the month with the highest profit for each branch.

● Expected Result: Correctly calculated total monthly profits per branch and identified the month with the highest profit.


In [13]:
#import required libraries
from pyspark.sql.functions import col, year, month, sum, max, row_number
from pyspark.sql import Window
#copy original datset into temp variable
result=merged_data

result = result.withColumn("Year", year(col("transaction_date")))

result = result.withColumn("Month", month(col("transaction_date")))

#monthly transaction
monthly_trans_df = result.groupBy("branch_id", "Year", "Month") \
    .agg(sum("amount").alias("Monthly_Trans_Amount"))

#Use window function
window = Window.partitionBy("branch_id").orderBy(col("Monthly_Trans_Amount").desc())

highest_prof_df = monthly_trans_df.withColumn("Rank", row_number().over(window)) \
    .filter(col("Rank") == 1) \
    .drop("Rank")
highest_prof_df.show()


                                                                                

+---------+----+-----+--------------------+
|branch_id|Year|Month|Monthly_Trans_Amount|
+---------+----+-----+--------------------+
|   BID005|2024|    8|              300.98|
|   BID002|2024|    6|   839.5699999999999|
|   BID004|2024|    9|              499.71|
|   BID003|2024|    7|              971.17|
|   BID001|2024|    6|              191.21|
+---------+----+-----+--------------------+



### 5. Transaction Processing Time Calculation (5 Marks):
● Task: Create a new column ProcessingTime that calculates the difference in days between Transaction_Date and Processing_Date. Validate the calculation by showing sample data.

● Expected Result: Correct calculation of processing time with sample data validation.

In [14]:
query=""" select * ,datediff(processing_date,transaction_date) as processing_time
from transaction_data"""
pd.read_sql(query,engine)

Unnamed: 0,transaction_id,account_id,merchant_id,transaction_date,processing_date,amount,processing_time
0,TXN001,ACC012,MER001,2024-07-02,2024-07-20,354.28,18
1,TXN002,ACC004,MER002,2024-09-03,2024-07-12,499.71,-53
2,TXN003,ACC014,MER003,2024-08-29,2024-06-11,168.53,-79
3,TXN004,ACC014,MER004,2024-08-10,2024-06-09,132.45,-62
4,TXN005,ACC006,MER005,2024-06-13,2024-08-16,374.18,64
5,TXN006,ACC010,MER006,2024-08-05,2024-07-29,252.03,-7
6,TXN007,ACC014,MER007,2024-06-22,2024-09-02,288.11,72
7,TXN008,ACC006,MER008,2024-07-04,2024-07-21,455.89,17
8,TXN009,ACC005,MER009,2024-08-10,2024-08-25,375.36,15
9,TXN010,ACC015,MER010,2024-07-01,2024-07-13,270.77,12


### 6. Customer Transaction and Profit Analysis (5 Marks):
● Task: Calculate the total number of transactions, average transaction value, and total transaction amount for each customer. Order the data frame by Total_Transaction_Amount in descending order and discuss any trends or insights.

● Expected Result: Accurate calculations and insightful analysis.


In [15]:
merged_data.createOrReplaceTempView("merged_table")
query="""SELECT 
    customer_id,
    COUNT(transaction_id) AS no_of_trans,
    AVG(amount) AS avg_trans_amount,
    SUM(amount) AS total_trans_amount
FROM 
    merged_table
GROUP BY 
    customer_id
ORDER BY 
    total_trans_amount DESC"""
spark.sql(query).show()




+-----------+-----------+------------------+------------------+
|customer_id|no_of_trans|  avg_trans_amount|total_trans_amount|
+-----------+-----------+------------------+------------------+
|    CUST005|          3|            336.93|           1010.79|
|    CUST012|          2|            417.83|            835.66|
|    CUST006|          2|415.03499999999997| 830.0699999999999|
|    CUST010|          2|           299.075|            598.15|
|    CUST014|          3|196.36333333333334|            589.09|
|    CUST004|          1|            499.71|            499.71|
|    CUST015|          1|            270.77|            270.77|
|    CUST003|          1|            191.21|            191.21|
|    CUST007|          0|              null|              null|
|    CUST011|          0|              null|              null|
|    CUST001|          0|              null|              null|
|    CUST002|          0|              null|              null|
|    CUST009|          0|              n

                                                                                

## Bonus Task: Transaction Validation
##### Task Description: Implement a transaction validation algorithm to check for invalid transactions. The validation criteria will include:
1. Negative Amounts: Ensure that no transaction has a negative amount.

2. Future Dates: Ensure that no transaction date is set in the future.

3. Required Fields: Check that all required fields (e.g., transaction ID, amount, date) are present.


In [17]:
from pyspark.sql.functions import datediff
# copy Original data into temp varial
final_data = merged_data

# Function to validate transactions
def validate_transactions(final_data):
    # Query for negative balances
    negative_balance = final_data.filter("balance < 0")
    
    # Query for invalid date differences
    future_dates = final_data.filter(datediff("processing_date", "transaction_date") < 0)
    
    # Query for missing transaction details
    check_required_fields = final_data.filter(col('transaction_id').isNull()) \
                                       .filter(col('amount').isNull()) \
                                       .filter(col('transaction_date').isNull())
    
    # Combine all above queries
    return negative_balance.union(future_dates).union(check_required_fields)

# Display the results
validate_transactions(final_data).show()




+-----------+----------+--------------+----------------+---------------+------+---------+-----------+------------+-------+-------------+-------------+----------------+-----------+--------+---------+-------------+-----------------+-------+
|merchant_id|account_id|transaction_id|transaction_date|processing_date|amount|branch_id|customer_id|account_type|balance|customer_name|customer_type|customer_country|branch_name|location|  manager|merchant_name|merchant_category|country|
+-----------+----------+--------------+----------------+---------------+------+---------+-----------+------------+-------+-------------+-------------+----------------+-----------+--------+---------+-------------+-----------------+-------+
|     MER006|    ACC010|        TXN006|      2024-08-05|     2024-07-29|252.03|   BID003|    CUST010|    Checking|3340.17|  Customer_10|   Individual|          Canada|   Branch_3| Toronto|Manager_3|   Merchant_6|    Entertainment|    USA|
|     MER004|    ACC014|        TXN004|     

                                                                                

### Problem Statement: Counting Prime Numbers with Kafka (10 Marks)

In [19]:
!pip install kafka-python

Defaulting to user installation because normal site-packages is not writeable


In [45]:
from kafka import KafkaProducer
import csv
import json
#starting a topic with kafka producer
def kafka_producer():
    #initialize the producer
    producer=KafkaProducer(
        bootstrap_servers=['master:9092'],
        value_serializer=lambda x:json.dumps(x).encode('utf-8')
    )
    i=1
    while(i<101):
        topic_name="num_topics"
        message=int(i)
        producer.send(topic_name,message)
        i+=1
    producer.flush()
    producer.close()
    
if __name__=="__main__":
    kafka_producer()

In [47]:
from kafka import KafkaConsumer
import json
from collections import Counter
from math import sqrt

def check_prime(num):
    global number
    if(num<2):
        return False
    if(num==2):
        return True
    for i in range(2,int(sqrt(num)+1)):
        if(num%i==0):
            return False
    return True;
        

def kafka_consumer_example(lm):
    consumer = KafkaConsumer(
        'num_topics',
        bootstrap_servers = ['master:9092'],
        auto_offset_reset = 'earliest',
        enable_auto_commit = True,
        value_deserializer=lambda x: int(x.decode('utf-8'))
    )
    count=0
    total=0
    for num in consumer:
        val=num.value
        if(check_prime(val)):
            count+=1
        total+=1
        if(total>=lm):
            break
    print("Total Number of prime numbers from 1 to 100 :",count)
    consumer.close()

if __name__ == "__main__":
    kafka_consumer_example(100)

Total Number of prime numbers from 1 to 100 : 25
