# Practice Project - Data Transformation with PySpark

This practice project focuses on data transformation and integration using PySpark. You will work with two datasets, perform various transformations such as adding columns, renaming columns, dropping unnecessary columns, joining dataframes, and writing results.


In [None]:
# Installing required packages
!pip install wget pyspark findspark

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, quarter, to_date, when, lit, sum, avg, col
from pyspark.sql.types import *

In [None]:
# Creating a SparkContext object
sc = SparkContext.getOrCreate()

# Creating a Spark Session
spark = SparkSession \
    .builder \
    .appName("PySpark Data Transformation Project") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

print("Spark session created successfully")

### Task 1: Load datasets into PySpark DataFrames


In [None]:
# Download the datasets using wget
import wget

link_to_data1 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset1.csv'
link_to_data2 = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/dataset2.csv'

print("Downloading dataset1.csv...")
wget.download(link_to_data1)
print("\nDownloading dataset2.csv...")
wget.download(link_to_data2)
print("\nDatasets downloaded successfully")

In [None]:
# Load the data into PySpark dataframes
df1 = spark.read.csv("dataset1.csv", header=True, inferSchema=True)
df2 = spark.read.csv("dataset2.csv", header=True, inferSchema=True)

print("DataFrames loaded successfully")
print(f"df1 shape: {df1.count()} rows, {len(df1.columns)} columns")
print(f"df2 shape: {df2.count()} rows, {len(df2.columns)} columns")

### Task 2: Display the schema of both dataframes


In [None]:
print("Schema of df1:")
df1.printSchema()

print("\nSchema of df2:")
df2.printSchema()

print("\nSample data from df1:")
df1.show(5)

print("\nSample data from df2:")
df2.show(5)

### Task 3: Add a new column to each dataframe


In [None]:
# Check column names first
print("Columns in df1:", df1.columns)
print("Columns in df2:", df2.columns)

# Add new column year to df1
date_columns_df1 = [col for col in df1.columns if 'date' in col.lower()]
if date_columns_df1:
    date_col_df1 = date_columns_df1[0]
    # Use the correct date format for the data (M/d/yyyy)
    df1 = df1.withColumn('year', year(to_date(date_col_df1, 'M/d/yyyy')))
    print(f"Added year column to df1 using {date_col_df1}")
else:
    print("No date column found in df1")

# Add new column quarter to df2
date_columns_df2 = [col for col in df2.columns if 'date' in col.lower()]
if date_columns_df2:
    date_col_df2 = date_columns_df2[0]
    # Use the correct date format for the data (M/d/yyyy)
    df2 = df2.withColumn('quarter', quarter(to_date(date_col_df2, 'M/d/yyyy')))
    print(f"Added quarter column to df2 using {date_col_df2}")
else:
    print("No date column found in df2")

print("\nUpdated df1 schema:")
df1.printSchema()
print("\nUpdated df2 schema:")
df2.printSchema()

### Task 4: Rename columns in both dataframes


In [None]:
# Rename df1 column amount to transaction_amount
if 'amount' in df1.columns:
    df1 = df1.withColumnRenamed('amount', 'transaction_amount')
    print("Renamed 'amount' to 'transaction_amount' in df1")
else:
    print("'amount' column not found in df1")

# Rename df2 column value to transaction_value
if 'value' in df2.columns:
    df2 = df2.withColumnRenamed('value', 'transaction_value')
    print("Renamed 'value' to 'transaction_value' in df2")
else:
    print("'value' column not found in df2")

print("\nUpdated columns in df1:", df1.columns)
print("Updated columns in df2:", df2.columns)

### Task 5: Drop unnecessary columns


In [None]:
# Drop columns description and location from df1
columns_to_drop_df1 = [col for col in ['description', 'location'] if col in df1.columns]
if columns_to_drop_df1:
    df1 = df1.drop(*columns_to_drop_df1)
    print(f"Dropped {columns_to_drop_df1} from df1")
else:
    print("No columns to drop from df1")

# Drop column notes from df2
if 'notes' in df2.columns:
    df2 = df2.drop('notes')
    print("Dropped 'notes' from df2")
else:
    print("'notes' column not found in df2")

print("\nFinal columns in df1:", df1.columns)
print("Final columns in df2:", df2.columns)

### Task 6: Join dataframes based on a common column


In [None]:
# Join df1 and df2 based on common column customer_id
if 'customer_id' in df1.columns and 'customer_id' in df2.columns:
    joined_df = df1.join(df2, 'customer_id', 'inner')
    print("Successfully joined df1 and df2 on customer_id")
    print("Joined dataframe columns:", joined_df.columns)
    print("Joined dataframe count:", joined_df.count())
    joined_df.show(5)
else:
    print("Warning: customer_id column not found in both dataframes")
    print("df1 columns:", df1.columns)
    print("df2 columns:", df2.columns)
    joined_df = df1  # Use df1 as fallback

### Task 7: Filter data based on a condition


In [None]:
# Filter the dataframe for transaction amount > 1000
if 'transaction_amount' in joined_df.columns:
    filtered_df = joined_df.filter("transaction_amount > 1000")
    print("Filtered dataframe for transaction_amount > 1000")
    print("Filtered dataframe count:", filtered_df.count())
    filtered_df.show(5)
else:
    print("Warning: transaction_amount column not found")
    print("Available columns:", joined_df.columns)
    filtered_df = joined_df  # Use joined_df as fallback

### Task 8: Aggregate data by customer


In [None]:
# Group by customer_id and aggregate the sum of transaction amount
if 'customer_id' in filtered_df.columns and 'transaction_amount' in filtered_df.columns:
    total_amount_per_customer = filtered_df.groupBy('customer_id').agg(sum('transaction_amount').alias('total_amount'))
    print("Total amount per customer:")
    total_amount_per_customer.show()
else:
    print("Warning: Required columns not found for aggregation")
    print("Available columns:", filtered_df.columns)
    total_amount_per_customer = filtered_df.limit(5)

### Task 9: Write the result to a Hive table


In [None]:
# Write total_amount_per_customer to a Hive table named customer_totals
try:
    total_amount_per_customer.write.mode("overwrite").saveAsTable("customer_totals")
    print("Successfully saved data to Hive table: customer_totals")
except Exception as e:
    print(f"Warning: Could not save to Hive table. Error: {str(e)[:200]}...")
    print("This is expected in a local environment without Hive setup")

### Task 10: Write the filtered data to HDFS


In [None]:
# Write filtered_df to HDFS in parquet format
try:
    # Use a local path since HDFS is not available
    filtered_df.coalesce(1).write.mode("overwrite").parquet("filtered_data.parquet")
    print("Successfully saved filtered data to parquet file: filtered_data.parquet")
except Exception as e:
    print(f"Warning: Could not save to parquet. Error: {str(e)[:200]}...")
    # Try saving as CSV as an alternative
    try:
        filtered_df.coalesce(1).write.mode("overwrite").csv("filtered_data_csv", header=True)
        print("Successfully saved filtered data as CSV: filtered_data_csv")
    except Exception as e2:
        print(f"Could not save as CSV either: {str(e2)[:100]}...")

### Task 11: Add a new column based on a condition


In [None]:
# Add new column with value indicating whether transaction amount is > 5000 or not
if 'transaction_amount' in df1.columns:
    df1 = df1.withColumn("high_value", when(col("transaction_amount") > 5000, lit("Yes")).otherwise(lit("No")))
    print("Added high_value column to df1")
    df1.select("transaction_amount", "high_value").show(10)
else:
    print("Warning: transaction_amount column not found in df1")
    print("Available columns:", df1.columns)

### Task 12: Calculate the average transaction value per quarter


In [None]:
# Calculate the average transaction value for each quarter in df2
if 'quarter' in df2.columns and 'transaction_value' in df2.columns:
    average_value_per_quarter = df2.groupBy('quarter').agg(avg("transaction_value").alias("avg_trans_val"))
    print("Average transaction value per quarter:")
    average_value_per_quarter.show()
else:
    print("Warning: Required columns not found for quarter aggregation")
    print("Available columns in df2:", df2.columns)
    average_value_per_quarter = df2.limit(5)

### Task 13: Write the result to a Hive table


In [None]:
# Write average_value_per_quarter to a Hive table named quarterly_averages
try:
    average_value_per_quarter.write.mode("overwrite").saveAsTable("quarterly_averages")
    print("Successfully saved data to Hive table: quarterly_averages")
except Exception as e:
    print(f"Warning: Could not save to Hive table. Error: {str(e)[:200]}...")
    print("This is expected in a local environment without Hive setup")

### Task 14: Calculate the total transaction value per year


In [None]:
# Calculate the total transaction value for each year in df1
if 'year' in df1.columns and 'transaction_amount' in df1.columns:
    total_value_per_year = df1.groupBy('year').agg(sum("transaction_amount").alias("total_transaction_val"))
    print("Total transaction value per year:")
    total_value_per_year.show()
else:
    print("Warning: Required columns not found for year aggregation")
    print("Available columns in df1:", df1.columns)
    total_value_per_year = df1.limit(5)

### Task 15: Write the result to HDFS


In [None]:
# Write total_value_per_year to HDFS in the CSV format
try:
    total_value_per_year.coalesce(1).write.mode("overwrite").csv("total_value_per_year.csv", header=True)
    print("Successfully saved data to CSV file: total_value_per_year.csv")
except Exception as e:
    print(f"Warning: Could not save to CSV. Error: {str(e)[:200]}...")

### Congratulations! You have completed the lab.

This practice project provides hands-on experience with data transformation and integration using PySpark. You've performed various tasks including:
- Loading and examining data
- Adding and renaming columns
- Dropping unnecessary columns
- Joining dataframes
- Filtering and aggregating data
- Writing results to different formats


In [None]:
# Stop the Spark session
spark.stop()
print("Spark session stopped successfully")