# Practice Project



This practice project focuses on data transformation and integration using PySpark. I will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and finally, writing the results into both a Hive warehouse and an HDFS file system.


### Prerequisites 



 


In [1]:
# Installing required packages

!pip install wget pyspark  findspark

Collecting pyspark
  Downloading pyspark-3.4.4.tar.gz (311.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.4/311.4 MB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m28.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.4-py2.py3-none-any.whl size=311905466 sha256=a35863f740ab71d024c4a50a48d0c1a7716112a2974846f9c6a287b74b33ccbe
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/4e/66/db/939eb1c49afb8a7fd2c4e393ad34e12b77db67bb4cc974c00e
Successfully built pyspark
Installing colle

#### Prework - Initiate the Spark Session


In [2]:
import findspark

findspark.init()

In [None]:

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [None]:

sc = SparkContext.getOrCreate()


spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Task 1: Load datasets into PySpark DataFrames

Download the datasets
1. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv  
2. https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv  



In [None]:
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
wget.download(link_to_data1)

link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'
wget.download(link_to_data2)

'dataset2.csv'

In [None]:

df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

### Task 2: Display the schema of both dataframes

Display the schema of `df1` and `df2` 


In [None]:

df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- notes: string (nullable = true)



#### Task 3: Add a new column to each dataframe





In [12]:
from pyspark.sql.functions import year, quarter, to_date

#Add new column year to df1

df1 = df1.withColumn('year', year(to_date('date_column','dd/MM/yyyy')))
#Add new column quarter to df2
df2 = df2.withColumn('quarter', quarter(to_date('transaction_date','dd/MM/yyyy')))





#### Task 4: Rename columns in both dataframes

Rename the column **amount** to **transaction_amount** in `df1` and **value** to **transaction_value** in `df2`.



In [None]:
df1 = df1.withColumnRenamed('amount', 'transaction_amount')
    
df2 = df2.withColumnRenamed('value', 'transaction_value')

df1.printSchema()
df2.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- date_column: string (nullable = true)
 |-- transaction_amount: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_value: integer (nullable = true)
 |-- notes: string (nullable = true)
 |-- quarter: integer (nullable = true)



#### Task 5: Drop unnecessary columns

Drop the columns **description** and **location** from `df1` and **notes** from `df2`.




In [None]:
df1 = df1.drop('description', 'location')
    
df2 = df2.drop('notes')

#### Task 6: Join dataframes based on a common column

Join `df1` and `df2` based on the common column **customer_id** and create a new dataframe named `joined_df`.




In [None]:
joined_df = df1.join(df2, 'customer_id', 'inner')



#### Task 7: Filter data based on a condition

Filter `joined_df` to include only transactions where "transaction_amount" is greater than 1000 and create a new dataframe named `filtered_df`.





In [None]:
filtered_df = joined_df.filter("transaction_amount > 1000")    


#### Task 8: Aggregate data by customer

Calculate the total transaction amount for each customer in `filtered_df` and display the result.




In [None]:

from pyspark.sql.functions import sum
   

total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))

total_amount_per_customer.show()




+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|         31|        3200|
|         85|        1800|
|         78|        1500|
|         34|        1200|
|         81|        5500|
|         28|        2600|
|         76|        2600|
|         27|        4200|
|         91|        3200|
|         22|        1200|
|         93|        5500|
|          1|        5000|
|         52|        2600|
|         13|        4800|
|          6|        4500|
|         16|        2600|
|         40|        2600|
|         94|        1200|
|         57|        5500|
|         54|        1500|
+-----------+------------+
only showing top 20 rows



                                                                                

#### Task 9: Write the result to a Hive table

Write `total_amount_per_customer` to a Hive table named **customer_totals**.


In [None]:

total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")

                                                                                

#### Task 10: Write the filtered data to HDFS

Write `filtered_df` to HDFS in parquet format to a file named **filtered_data**.


In [None]:
filtered_df.write.mode("overwrite").parquet("filtered_data.parquet")


#### Task 11: Add a new column based on a condition

Add a new column named **high_value** to `df1` indicating whether the transaction_amount is greater than 5000. When the value is greater than 5000, the value of the column should be **Yes**. When the value is less than or equal to 5000, the value of the column should be **No**. 


In [None]:
from pyspark.sql.functions import when, lit

df1 = df1.withColumn("high_value", when(df1.transaction_amount > 5000, lit("Yes")).otherwise(lit("No")))


#### Task 12: Calculate the average transaction value per quarter

Calculate and display the average transaction value for each quarter in `df2` and create a new dataframe named `average_value_per_quarter` with column `avg_trans_val`.


In [None]:
from pyspark.sql.functions import avg

average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))

    
average_value_per_quarter.show()



                                                                                

+-------+------------------+
|quarter|     avg_trans_val|
+-------+------------------+
|      1| 1111.111111111111|
|      3|1958.3333333333333|
|      4| 816.6666666666666|
|      2|            1072.0|
+-------+------------------+



                                                                                

#### Task 13: Write the result to a Hive table

Write `average_value_per_quarter` to a Hive table named **quarterly_averages**.


In [None]:

average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")


                                                                                

#### Task 14: Calculate the total transaction value per year

Calculate and display the total transaction value for each year in `df1` and create a new dataframe named `total_value_per_year` with column `total_transaction_val`.




In [None]:
total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))

total_value_per_year.show()


+----+---------------------+
|year|total_transaction_val|
+----+---------------------+
|2025|                25700|
|2027|                25700|
|2023|                28100|
|2022|                29800|
|2026|                25700|
|2029|                25700|
|2030|                 9500|
|2028|                25700|
|2024|                25700|
+----+---------------------+



                                                                                