In [None]:
!pip install wget pyspark  findspark

In [2]:
import findspark

findspark.init()

In [4]:
from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [5]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/04/25 13:14:27 WARN Utils: Your hostname, javier-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.81.128 instead (on interface ens33)
24/04/25 13:14:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/25 13:14:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Download datasets

In [6]:
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

## Load the data into a pyspark dataframe

In [7]:
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

                                                                                

## Print the schema of df1 and df2
    

In [8]:
df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



In [12]:
from pyspark.sql.functions import year, quarter, to_date

In [13]:
#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))

#Add new column quarter to df2    
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))

In [16]:
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')

#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')

In [17]:
#Drop columns description and location from df1
df1 = df1.drop('description', 'location')

#Drop column notes from df2
df2 = df2.drop('notes')

In [18]:
#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')

In [19]:
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")    

In [20]:
from pyspark.sql.functions import sum

# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

#display the result
total_amount_per_customer.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



                                                                                

In [21]:
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

                                                                                

In [23]:
#Write filtered_df to HDFS in parquet format file filtered_data
#Change spark date format  configuration

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")

                                                                                

In [24]:
from pyspark.sql.functions import when, lit

# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))

In [32]:
from pyspark.sql.functions import avg, round

#calculate the average transaction value for each quarter in df2
average_value_per_quarter = (df2.groupBy('quarter').agg(round(avg("transaction_value")).alias("avg_trans_val"))).orderBy("quarter")
#show the average transaction value for each quarter in df2    
average_value_per_quarter.show()

[Stage 27:>                                                         (0 + 1) / 1]

+-------+-------------+
|quarter|avg_trans_val|
+-------+-------------+
|      1|       1111.0|
|      2|       1072.0|
|      3|       1958.0|
|      4|        817.0|
+-------+-------------+



                                                                                

In [33]:
#Write average_value_per_quarter to a Hive table named quarterly_averages

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

                                                                                

In [35]:
# calculate the total transaction value for each year in df1.
total_value_per_year = (df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))).orderBy("year")

# show the total transaction value for each year in df1.
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2022|                29800|
|2023|                28100|
|2024|                25700|
|2025|                25700|
|2026|                25700|
|2027|                25700|
|2028|                25700|
|2029|                25700|
|2030|                 9500|
+----+---------------------+



In [36]:
#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")

                                                                                