In [None]:
# Installing required packages

!pip install wget pyspark  findspark

#### Prework - Initiate the Spark Session


In [1]:
import findspark

findspark.init()

In [2]:
# PySpark is the Spark API for Python. we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import wget

In [3]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Loading Data From Csv

In [4]:
#load the data into a pyspark dataframe
df1=spark.read.csv('purchases.csv',header=True,inferSchema=True)
df2=spark.read.csv('transactions.csv',header=True,inferSchema=True)

###  Display the schema of both dataframes



In [5]:
#print the schema of df1 and df2
df1.printSchema()
df2.printSchema()


root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



#### Add a new column to each dataframe



In [18]:
from pyspark.sql.functions import year, quarter, to_date

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
df1.show()

#Add new column quarter to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))
df2.show()


+-----------+-----------+------+-----------+--------+----+
|customer_id|date_column|amount|description|location|year|
+-----------+-----------+------+-----------+--------+----+
|          1|   1/1/2022|  5000| Purchase A| Store A|2022|
|          2|  15/2/2022|  1200| Purchase B| Store B|2022|
|          3|  20/3/2022|   800| Purchase C| Store C|2022|
|          4|  10/4/2022|  3000| Purchase D| Store D|2022|
|          5|   5/5/2022|  6000| Purchase E| Store E|2022|
|          6|  10/6/2022|  4500| Purchase F| Store F|2022|
|          7|  15/7/2022|   200| Purchase G| Store G|2022|
|          8|  20/8/2022|  3500| Purchase H| Store H|2022|
|          9|  25/9/2022|   700| Purchase I| Store I|2022|
|         10| 30/10/2022|  1800| Purchase J| Store J|2022|
|         11|  5/11/2022|  2200| Purchase K| Store K|2022|
|         12| 10/12/2022|   900| Purchase L| Store L|2022|
|         13|  15/1/2023|  4800| Purchase M| Store M|2023|
|         14|  20/2/2023|   300| Purchase N| Store N|202

####  Rename columns in both dataframes



In [22]:
#Rename df1 column amount to transaction_amount
df1=df1.withColumnRenamed('amount','transaction_amount')
df1.show()

#Rename df2 column value to transaction_value
df2=df2.withColumnRenamed('value','transaction_value')
df2.show()


+-----------+-----------+------------------+-----------+--------+----+
|customer_id|date_column|transaction_amount|description|location|year|
+-----------+-----------+------------------+-----------+--------+----+
|          1|   1/1/2022|              5000| Purchase A| Store A|2022|
|          2|  15/2/2022|              1200| Purchase B| Store B|2022|
|          3|  20/3/2022|               800| Purchase C| Store C|2022|
|          4|  10/4/2022|              3000| Purchase D| Store D|2022|
|          5|   5/5/2022|              6000| Purchase E| Store E|2022|
|          6|  10/6/2022|              4500| Purchase F| Store F|2022|
|          7|  15/7/2022|               200| Purchase G| Store G|2022|
|          8|  20/8/2022|              3500| Purchase H| Store H|2022|
|          9|  25/9/2022|               700| Purchase I| Store I|2022|
|         10| 30/10/2022|              1800| Purchase J| Store J|2022|
|         11|  5/11/2022|              2200| Purchase K| Store K|2022|
|     

#### Drop unnecessary columns





In [25]:
#Drop columns description and location from df1
df1=df1.drop('description','location')

#Drop column notes from df2
df2=df2.drop('notes')


#### Join dataframes based on a common column






In [29]:
#join df1 and df2 based on common column customer_id
joined_df=df1.join(df2,on='customer_id',how='inner')
joined_df.show()

+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          3|  20/3/2022|               800|2022|       20/3/2022|             1000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      2|
|          7|  15/7/2022|               200|2022|       15/7/2022|              700|      3|
|          8|  20/8/2022|              3500|2022|       20/8/2022|    

####  Filter data based on a condition




In [33]:
# filter the dataframe for transaction amount > 1000

filtered_df= joined_df.filter('transaction_amount > 1000')
filtered_df.show()


+-----------+-----------+------------------+----+----------------+-----------------+-------+
|customer_id|date_column|transaction_amount|year|transaction_date|transaction_value|quarter|
+-----------+-----------+------------------+----+----------------+-----------------+-------+
|          1|   1/1/2022|              5000|2022|        1/1/2022|             1500|      1|
|          2|  15/2/2022|              1200|2022|       15/2/2022|             2000|      1|
|          4|  10/4/2022|              3000|2022|       10/4/2022|             2500|      2|
|          5|   5/5/2022|              6000|2022|        5/5/2022|             1800|      2|
|          6|  10/6/2022|              4500|2022|       10/6/2022|             1200|      2|
|          8|  20/8/2022|              3500|2022|       20/8/2022|             3000|      3|
|         10| 30/10/2022|              1800|2022|      30/10/2022|             1200|      4|
|         11|  5/11/2022|              2200|2022|       5/11/2022|    

####  Aggregate data by customer



In [45]:
from pyspark.sql.functions import sum
# group by customer_id and aggregate the sum of transaction amount
total_amount_per_person=filtered_df.groupBy(['customer_id']).agg({'transaction_amount':'sum'}) #method 1
total_amount_per_person=filtered_df.groupBy(['customer_id']).agg(sum('transaction_amount').alias('total_amount')) #method 1

#display the result
total_amount_per_person.show()



+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



#### Write the result to a HDFS in Local



In [63]:
# Write total_amount_per_customer to a HDFS in Local

total_amount_per_person.write.mode("overwrite").option("header", "true").csv('hdfs://localhost:8020/user/hive/warehouse/bankdata/total_amount_per_person.csv')


In [None]:
# Reading Data for trial Puposes
df = spark.read.csv('hdfs://localhost:8020/user/hive/warehouse/bankdata/total_amount_per_person.csv/filename',inferSchema=True,header=True)


#### Write the filtered data to HDFS in Local



In [65]:
#Write filtered_df to HDFS in Local

filtered_df.write.mode("overwrite").option("header", "true").parquet('hdfs://localhost:8020/user/hive/warehouse/bankdata/filtered_df.parquet')

#### Add a new column based on a condition



In [75]:
# Add new column with value indicating whether transaction amount is > 5000 or not
from pyspark.sql.functions import when

df1=df1.withColumn('indicator',when(df1['transaction_amount']>5000,'Yes').otherwise("No"))

df1.show()

+-----------+-----------+------------------+----+---------+
|customer_id|date_column|transaction_amount|year|indicator|
+-----------+-----------+------------------+----+---------+
|          1|   1/1/2022|              5000|2022|       No|
|          2|  15/2/2022|              1200|2022|       No|
|          3|  20/3/2022|               800|2022|       No|
|          4|  10/4/2022|              3000|2022|       No|
|          5|   5/5/2022|              6000|2022|      Yes|
|          6|  10/6/2022|              4500|2022|       No|
|          7|  15/7/2022|               200|2022|       No|
|          8|  20/8/2022|              3500|2022|       No|
|          9|  25/9/2022|               700|2022|       No|
|         10| 30/10/2022|              1800|2022|       No|
|         11|  5/11/2022|              2200|2022|       No|
|         12| 10/12/2022|               900|2022|       No|
|         13|  15/1/2023|              4800|2023|       No|
|         14|  20/2/2023|               

#### Calculate the average transaction value per quarter



In [73]:
from pyspark.sql.functions import avg

#calculate the average transaction value for each quarter in df2
average_value_per_quarter=df2.groupby("quarter").agg(avg('transaction_value').alias('avg_trans_val'))

#show the average transaction value for each quarter in df2    
average_value_per_quarter.show()


+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



####  Write the result to a HDFS in Local



In [74]:
#Write average_value_per_quarter to a HDFS in Local
average_value_per_quarter.write.mode("overwrite").option("header", "true").csv('hdfs://localhost:8020/user/hive/warehouse/bankdata/quarterly_averages')


#### Calculate the total transaction value per year


In [78]:
# calculate the total transaction value for each year in df1.
total_value_per_year=df1.groupby('year').agg(sum('transaction_amount').alias('total_per_year'))

# show the total transaction value for each year in df1.
total_value_per_year.show()


+----+--------------+
|year|total_per_year|
+----+--------------+
|2025|         25700|
|2027|         25700|
|2023|         28100|
|2022|         29800|
|2026|         25700|
|2029|         25700|
|2030|          9500|
|2028|         25700|
|2024|         25700|
+----+--------------+



#### Task 15: Write the result to HDFS in Local




In [79]:
#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").option("header", "true").csv('hdfs://localhost:8020/user/hive/warehouse/bankdata/total_value_per_year')


In [None]:
spark.stop()