<a href="https://colab.research.google.com/github/AkshaySarkar/Transaction-Big-Data-Processing/blob/main/Big_Data_Processing_of_Transaction_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install wget pyspark  findspark


Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: wget, pyspark
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=84aa76635b02568553f13a40d0e8c98251660f30a48aeb2294e80ff6d35a7cd8
  Stored in directory: /root/.cache/pip/wheels/8b/f1/7f/5c94f0a7a505ca1c81cd1d9208ae2064675d97582078e6c769
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=965ded23fd00c100c587b01c803aa6d5e51ff9127665b96a2b0

In [2]:
import findspark

findspark.init()

In [3]:
#initializing spark context
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

In [5]:
# downloading the dataset using wget
import pandas as pd
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

In [6]:
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

In [7]:
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



In [8]:
from pyspark.sql.functions import year, quarter, to_date

#Adding new column 'year' to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))

#Adding new column 'quarter' to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)
 |-- quarter: integer (nullable = true)



In [9]:
#Renaming df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')

#Renaming df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')
df1.head()

Row(customer_id=1, date_column='1/1/2022', transaction_amount=5000, description='Purchase A', location='Store A', year=2022)

In [10]:
#Dropping unnecessary columns
#Dropping columns description and location from df1
df1 = df1.drop('description', 'location')

#Drop column notes from df2
df2 = df2.drop('notes')
df1.head()

Row(customer_id=1, date_column='1/1/2022', transaction_amount=5000, year=2022)

In [11]:
#joining df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')
joined_df.show(5)

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          3|  20/3/2022|               800|2022|       20/3/2022|             1000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
only showing top 5 rows



In [12]:
# filtering the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")
filtered_df.show(5)

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      2|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
only showing top 5 rows



In [13]:
from pyspark.sql.functions import sum

# grouping by customer_id and aggregating the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))
total_amount_per_customer.show(10)

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
+-----------+------------+
only showing top 10 rows



In [14]:
# Writing total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

In [15]:
#Writing filtered_df to HDFS in parquet format file filtered_data
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

In [16]:
from pyspark.sql.functions import when, lit

# adding new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))
df1.show(5)

+-----------+-----------+------------------+----+----------+
|customer_id|date_column|transaction_amount|year|high_value|
+-----------+-----------+------------------+----+----------+
|          1|   1/1/2022|              5000|2022|        No|
|          2|  15/2/2022|              1200|2022|        No|
|          3|  20/3/2022|               800|2022|        No|
|          4|  10/4/2022|              3000|2022|        No|
|          5|   5/5/2022|              6000|2022|       Yes|
+-----------+-----------+------------------+----+----------+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import avg

#calculating the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))


#showing the average transaction value for each quarter in df2
average_value_per_quarter.show()

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



In [18]:
# writing average_value_per_quarter to a Hive table named quarterly_averages

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

In [19]:
# calculating the total transaction value for each year in df1 and saving it to hdfs in csv format
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))
total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+

