<p style="text-align:center">
    <a href="https://skills.network" target="_blank">
    <img src="https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/assets/logos/SN_web_lightmode.png" width="200" alt="Skills Network Logo">
    </a>
</p>


# Practice Project

Estimated time needed: **60** minutes

This practice project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.


### Prerequisites

For this lab assignment, you will use wget, Python and Spark (PySpark). Therefore, it's essential to make sure that the below-specified libraries are installed in your lab environment or within Skills Network (SN) Labs.


In [None]:
# Installing required packages
%pip install wget pyspark findspark

#### Prework - Initiate the Spark Session


In [4]:
import findspark

findspark.init()

In [5]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [67]:
# Creating a SparkContext object

# sc = SparkContext.getOrCreate()

# Creating a Spark Session

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Task 1: Load datasets into PySpark DataFrames

Download the datasets from the folloing links using `wget` and load it in a Spark Dataframe.

1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv

_Hint: Import wget_


In [2]:
# download dataset using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'

wget.download(link_to_data1, out='customers.csv')
wget.download(link_to_data2, out='customer_transactions.csv')

'customer_transactions.csv'

<details>
    <summary>Click here for Solution</summary>

```python
# download the dataset using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)
```

</details>


In [None]:
# load the data into a pyspark dataframe
df_customers = spark.read.csv('customers.csv', header=True, inferSchema=True)
df_customers.show()

df_transactions = spark.read.csv('customer_transactions.csv',
                                 header=True, inferSchema=True)
df_transactions.show()

<details>
    <summary>Click here for Solution</summary>

```python

#load the data into a pyspark dataframe

df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)
```

</details>


### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` to understand the structure of the datasets.


In [68]:
# print the schema of df1 and df2
df_customers.printSchema()
df_transactions.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- high_value: string (nullable = false)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_value: integer (nullable = true)
 |-- quarter: integer (nullable = true)



<details>
    <summary>Click here for Solution</summary>

```python

#print the schema of df1 and df2

df1.printSchema()
df2.printSchema()
```

</details>


#### Task 3: Add a new column to each dataframe

Add a new column named **year** to `df1` and **quarter** to `df2` representing the year and quarter of the data.

_Hint: use withColumn. Convert the date columns which are present as string to date before extracting the year and quarter information_


In [70]:
from pyspark.sql.functions import year, quarter, to_date

# https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
# https://stackoverflow.com/questions/62943941/to-date-fails-to-parse-date-in-spark-3-0

# Add new column year to df1
df_customers = df_customers.withColumn(
    'year', year(to_date('date_column', 'd/M/yyyy')))  # Spark 3.x use 'd/M/yyyy' instead of 'dd/MM/yyyy'

# Add new column quarter to df2
df_transactions = df_transactions.withColumn(
    'quarter', quarter(to_date('transaction_date', 'd/M/yyyy')))

# df_customers.show(2)
# df_transactions.show(2)

+-----------+----------------+-----------------+-------+
|customer_id|transaction_date|transaction_value|quarter|
+-----------+----------------+-----------------+-------+
|          1|        1/1/2022|             1500|      1|
|          2|       15/2/2022|             2000|      1|
+-----------+----------------+-----------------+-------+
only showing top 2 rows



<details>
    <summary>Click here for Solution</summary>

````python
from pyspark.sql.functions import year, quarter

#Add new column year to df1
df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))

#Add new column quarter to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))```

</details>



````


#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.

_Hint: Use withColumnRenamed_


In [30]:
df_customers.show(1)
df_transactions.show(1)

+-----------+-----------+------------------+----+
|customer_id|date_column|transaction_amount|year|
+-----------+-----------+------------------+----+
|          1|   1/1/2022|              5000|2022|
+-----------+-----------+------------------+----+
only showing top 1 row

+-----------+----------------+-----------------+-------+
|customer_id|transaction_date|transaction_value|quarter|
+-----------+----------------+-----------------+-------+
|          1|        1/1/2022|             1500|      1|
+-----------+----------------+-----------------+-------+
only showing top 1 row



In [26]:
# Rename df1 column amount to transaction_amount
df_customers = df_customers.withColumnRenamed(
    'amount', 'transaction_amount')

# Rename df2 column value to transaction_value
df_transactions = df_transactions.withColumnRenamed(
    'value', 'transaction_value')

<details>
    <summary>Click here for Solution</summary>

```python

#Rename df1 column amount to transaction_amount
df1 = df1.withColumnRenamed('amount', 'transaction_amount')

#Rename df2 column value to transaction_value
df2 = df2.withColumnRenamed('value', 'transaction_value')
```

</details>


#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.


In [29]:
# Drop columns description and location from df1
df_customers = df_customers.drop('description', 'location')

# Drop column notes from df2
df_transactions = df_transactions.drop('notes')

<details>
    <summary>Click here for Solution</summary>

```python

#Drop columns description and location from df1
df1 = df1.drop('description', 'location')

#Drop column notes from df2
df2 = df2.drop('notes')
```

</details>


#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.


In [31]:
# join df1 and df2 based on common column customer_id
joined_df = df_customers.join(
    other=df_transactions, on='customer_id', how='inner')

In [None]:
joined_df.show(2)

<details>
    <summary>Click here for Solution</summary>

```python

#join df1 and df2 based on common column customer_id
joined_df = df1.join(df2, 'customer_id', 'inner')

```

</details>


#### Task 7: Filter data based on a condition

Filter `joined_df` to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named `filtered_df`.


In [33]:
# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter(joined_df['transaction_amount'] > 1000)

<details>
    <summary>Click here for Solution</summary>

```python

# filter the dataframe for transaction amount > 1000
filtered_df = joined_df.filter("transaction_amount > 1000")
```

</details>


#### Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in `filtered_df` and display the result.

_Hint: Use sum from pyspark.sql.functions_


In [71]:
from pyspark.sql.functions import sum

# group by customer_id and aggregate the sum of transaction amount
total_amount_per_customer = filtered_df.groupBy('customer_id')\
    .agg(sum('transaction_amount').alias('total_amount'))\
    .orderBy('total_amount', ascending=False)

# display the result in ascending order of sum of transaction amount
total_amount_per_customer.show(5)

+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|          5|        6000|
|         81|        5500|
|         93|        5500|
|         57|        5500|
|         69|        5500|
+-----------+------------+
only showing top 5 rows



<details>
    <summary>Click here for Solution</summary>

```python

from pyspark.sql.functions import sum

# group by customer_id and aggregate the sum of transaction amount

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

#display the result
total_amount_per_customer.show()
```

</details>


#### Task 9: Write the result to a Hive table

Write `total_amount_per_customer` to a Hive table named **customer_totals**.


In [44]:
# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode(
    'overwrite').saveAsTable('customer_totals')

                                                                                

<details>
    <summary>Click here for Solution</summary>

```python

# Write total_amount_per_customer to a Hive table named customer_totals
total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")
```

</details>


#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [45]:
# Write filtered_df to HDFS in parquet format file filtered_data.parquet
filtered_df.write.mode('overwrite').parquet('filtered_data.parquet')

<details>
    <summary>Click here for Solution</summary>

```python

#Write filtered_df to HDFS in parquet format file filtered_data

filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")
```

</details>


#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**.

\*Hint: Use when and lit from pyspark.sql.functions


In [46]:
from pyspark.sql.functions import when, lit
# Add new column with value indicating whether transaction amount is > 5000 or not
df_customers = df_customers.withColumn(
    'high_value', when(df_customers['transaction_amount'] > 5000, lit('Yes')).otherwise(lit('No')))

<details>
    <summary>Click here for Solution</summary>

```python

from pyspark.sql.functions import when, lit

# Add new column with value indicating whether transaction amount is > 5000 or not
df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))
```

</details>


#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.

_Hint: Use avg from pyspark.sql.functions_


In [50]:
from pyspark.sql.functions import avg
# calculate the average transaction value for each quarter in df2
average_value_per_quarter = df_transactions.groupBy('quarter').agg(
    avg('transaction_value').alias('avg_trans_val'))


# show the average transaction value for each quarter in df2
average_value_per_quarter.show()

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



<details>
    <summary>Click here for Solution</summary>

```python
from pyspark.sql.functions import avg

#calculate the average transaction value for each quarter in df2
average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))


#show the average transaction value for each quarter in df2
average_value_per_quarter.show()

```

</details>


#### Task 13: Write the result to a Hive table

Write `average_value_per_quarter` to a Hive table named **quarterly_averages**.


In [51]:
# Write average_value_per_quarter to a Hive table named quarterly_averages
average_value_per_quarter.write.mode(
    'overwrite').saveAsTable('quarterly_averages')

<details>
    <summary>Click here for Solution</summary>

```python

#Write average_value_per_quarter to a Hive table named quarterly_averages

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")

```

</details>


#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.


In [59]:
from pyspark.sql.functions import sum
# calculate the total transaction value for each year in df1.
total_value_per_year = df_customers.groupBy('year').agg(
    sum('transaction_amount').alias('total_transaction_val'))

# show the total transaction value for each year in df1.
total_value_per_year.show()

+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+



<details>
    <summary>Click here for Solution</summary>

```python

# calculate the total transaction value for each year in df1.
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

# show the total transaction value for each year in df1.
total_value_per_year.show()

```

</details>


#### Task 15: Write the result to HDFS

Write `total_value_per_year` to HDFS in the CSV format to file named **total_value_per_year**.


In [66]:
# Write total_value_per_year to HDFS in the CSV format
total_value_per_year.write.mode('overwrite').csv('total_value_per_year.csv')

<details>
    <summary>Click here for Solution</summary>

```python

#Write total_value_per_year to HDFS in the CSV format

total_value_per_year.write.mode("overwrite").csv("total_value_per_year.csv")

```

</details>


### Congratulations! You have completed the lab.

This practice project provides hands-on experience with data transformation and integration using PySpark. You've performed various tasks, including adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing the results into both a Hive warehouse and an HDFS file system.


## Authors

Raghul Ramesh

Lavanya T S


## Change Log


| Date (YYYY-MM-DD) | Version | Changed By   | Change Description |
| ----------------- | ------- | ------------ | ------------------ |
| 2023-09-01        | 0.1     | Lavanya T S  | Initial version    |
| 2023-09-08        | 0.2     | Pornima More | QA pass with edits |


Copyright © 2023 IBM Corporation. All rights reserved.
