<a href="https://colab.research.google.com/github/alameenwaziri/Machine-Learning-with-Apache-Spark/blob/main/Data%20Processing%20Using%20Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


# Practice Project

Estimated time needed: **60** minutes

This practice project focuses on data transformation and integration using PySpark. Work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.


### Prerequisites

For this lab, we will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in the lab environment or within Skills Network (SN) Labs.




In [1]:
# Installing required packages

!pip install wget pyspark  findspark

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9656 sha256=98823e55abfd9c581939673540ad492cb7a905c23b729d2a800995f503211b85
  Stored in directory: /root/.cache/pip/wheels/8b/f1/7f/5c94f0a7a505ca1c81cd1d9208ae2064675d97582078e6c769
Successfully built wget
Installing collected packages: wget, findspark
Successfully installed findspark-2.0.1 wget-3.2


#### Prework - Initiate the Spark Session


In [2]:
import findspark

findspark.init()

In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [4]:
# Creating a SparkContext object

sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using `wget` and load it in a Spark Dataframe.

1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  

*Hint: Import wget*


In [5]:
#download dataset using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

In [6]:
#load the data into a pyspark dataframe

df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` to understand the structure of the datasets.


In [7]:
#print the schema of df1 and df2

df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



#### Task 3: Add a new column to each dataframe

Add a new column named **year** to `df1` and **quarter** to `df2` representing the year and quarter of the data.

*Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information*




In [8]:
from pyspark.sql.functions import year, quarter, to_date

#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))

# Add new column quarter to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))



#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.

*Hint: Use withColumnRenamed*


In [9]:
#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')

# Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')


#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.




In [10]:
# Drop columns description and location from df1
df1 = df1.drop('description', 'location')

# Drop column notes from df2
df2 = df2.drop('notes')



#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.




In [11]:
#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')

#### Task 7: Filter data based on a condition

Filter `joined_df` to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named `filtered_df`.





In [12]:
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")

#### Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in `filtered_df` and display the result.

*Hint: Use sum from pyspark.sql.functions*


In [13]:
from pyspark.sql.functions import sum

# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

# display the result
total_amount_per_customer.show()


+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



#### Task 9: Write the result to a Hive table

Write `total_amount_per_customer` to a Hive table named **customer_totals**.


In [17]:
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [23]:
#Write filtered_df to HDFS in parquet format file filtered_data

from pyspark.sql.functions import to_date
from pyspark.sql import SparkSession

# Initialize Spark session and set legacy time parser policy
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Assuming the date column is named 'date_column'
filtered_df = filtered_df.withColumn('date_column', to_date('date_column', 'MM/dd/yyyy'))

# Write the DataFrame to parquet
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")



#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**.

*Hint: Use when and lit from pyspark.sql.functions


In [24]:
# Add new column with value indicating whether transaction amount is > 5000 or not

from pyspark.sql.functions import when, lit

# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))

#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.

*Hint: Use avg from pyspark.sql.functions*


In [25]:
#calculate the average transaction value for each quarter in df2

from pyspark.sql.functions import avg

# calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))


# show the average transaction value for each quarter in df2
average_value_per_quarter.show()


+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



#### Task 13: Write the result to a Hive table

Write `average_value_per_quarter` to a Hive table named **quarterly_averages**.


In [26]:
#Write average_value_per_quarter to a Hive table named quarterly_averages
average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.
> **Note:** The provided DataFrame `df1` does not explicitly have a `year` column initially. However, in Task 3, a new column named `year` is added to `df1` by extracting the year from the date column. Additionally, in Task 4, the column `amount` is renamed to `transaction_amount`.



In [27]:
# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

# show the total transaction value for each year in df1.
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+



#### Task 15: Write the result to HDFS

Write `total_value_per_year` to HDFS in the CSV format to file named **total_value_per_year**.



In [28]:
#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")


This practice project provides hands-on experience with data transformation and integration using PySpark. Various tasks, including adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing the results into both a Hive warehouse and an HDFS file system.


## Authors

Alameen Waziri




<!--## Change Log -->


<!--|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-09-01|0.1|Lavanya T S|Initial version|
|2023-09-08|0.2|Pornima More|QA pass with edits|-->


<h3 align="center"> &#169; IBM Corporation. All rights reserved. <h3/>
