In [0]:
#Read CSV dataset with PERMISSIVE mode to capture malformed records
spark
df_raw = spark.read\
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .csv("/Volumes/workspace/assignment/volume/2015.csv")
df_raw.show(5)

+-----------+--------------+--------------+---------------+--------------+------------------------+-------+------------------------+-------+-----------------------------+----------+-----------------+
|    Country|        Region|Happiness Rank|Happiness Score|Standard Error|Economy (GDP per Capita)| Family|Health (Life Expectancy)|Freedom|Trust (Government Corruption)|Generosity|Dystopia Residual|
+-----------+--------------+--------------+---------------+--------------+------------------------+-------+------------------------+-------+-----------------------------+----------+-----------------+
|Switzerland|Western Europe|             1|          7.587|       0.03411|                 1.39651|1.34951|                 0.94143|0.66557|                      0.41978|   0.29678|          2.51738|
|    Iceland|Western Europe|             2|          7.561|       0.04884|                 1.30232|1.40223|                 0.94784|0.62877|                      0.14145|    0.4363|          2.70201|


In [0]:
# a. Print all column names
print("Columns:", df_raw.columns)
# b. Count total rows
print("Row count:", df_raw.count())
# c. Display schema structure
df_raw.printSchema()
# d. Print the number of columns
print("Number of columns:", len(df_raw.columns))

Columns: ['Country', 'Region', 'Happiness Rank', 'Happiness Score', 'Standard Error', 'Economy (GDP per Capita)', 'Family', 'Health (Life Expectancy)', 'Freedom', 'Trust (Government Corruption)', 'Generosity', 'Dystopia Residual']
Row count: 158
root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Happiness Rank: integer (nullable = true)
 |-- Happiness Score: double (nullable = true)
 |-- Standard Error: double (nullable = true)
 |-- Economy (GDP per Capita): double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health (Life Expectancy): double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Trust (Government Corruption): double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Dystopia Residual: double (nullable = true)

Number of columns: 12


In [0]:
from pyspark.sql.functions import col

if "_corrupt_record" in df_raw.columns:
    corrupt = df_raw.filter(col("_corrupt_record").isNotNull())
    print("Corrupted records found:")
    corrupt.show(truncate=False)
else:
    print("No '_corrupt_record' column detected — no malformed records or clean dataset.")

No '_corrupt_record' column detected — no malformed records or clean dataset.


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# Define explicit schema for 2015.csv dataset
custom_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Happiness Rank", IntegerType(), True),
    StructField("Happiness Score", DoubleType(), True),
    StructField("Standard Error", DoubleType(), True),
    StructField("Economy (GDP per Capita)", DoubleType(), True),
    StructField("Family", DoubleType(), True),
    StructField("Health (Life Expectancy)", DoubleType(), True),
    StructField("Freedom", DoubleType(), True),
    StructField("Trust (Government Corruption)", DoubleType(), True),
    StructField("Generosity", DoubleType(), True),
    StructField("Dystopia Residual", DoubleType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .schema(custom_schema) \
    .csv("/Volumes/workspace/assignment/volume/2015.csv")
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Happiness Rank: integer (nullable = true)
 |-- Happiness Score: double (nullable = true)
 |-- Standard Error: double (nullable = true)
 |-- Economy (GDP per Capita): double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health (Life Expectancy): double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Trust (Government Corruption): double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Dystopia Residual: double (nullable = true)



In [0]:
from pyspark.sql.functions import col, lit, round
from pyspark.sql.types import DoubleType

df_transformed = (
    df
    # Rename columns
    .withColumnRenamed("Happiness Score", "Happiness_Score")
    .withColumnRenamed("Economy (GDP per Capita)", "Economy_GDP_per_Capita")
    # Derived column: GDP * Happiness Score (example)
    .withColumn("GDP_Happiness", round(col("Economy_GDP_per_Capita") * col("Happiness_Score"), 2))
    # Filter rows: Happiness Score > 5.0
    .filter(col("Happiness_Score") > 5.0)
    # Add constant column
    .withColumn("dataset_name", lit("World Happiness 2015"))
    # Cast type
    .withColumn("Happiness_Score_Double", col("Happiness_Score").cast(DoubleType()))
    # Remove unnecessary column
    .drop("Standard Error")
)

display(df_transformed.limit(5))

Country,Region,Happiness Rank,Happiness_Score,Economy_GDP_per_Capita,Family,Health (Life Expectancy),Freedom,Trust (Government Corruption),Generosity,Dystopia Residual,GDP_Happiness,dataset_name,Happiness_Score_Double
Switzerland,Western Europe,1,7.587,1.39651,1.34951,0.94143,0.66557,0.41978,0.29678,2.51738,10.6,World Happiness 2015,7.587
Iceland,Western Europe,2,7.561,1.30232,1.40223,0.94784,0.62877,0.14145,0.4363,2.70201,9.85,World Happiness 2015,7.561
Denmark,Western Europe,3,7.527,1.32548,1.36058,0.87464,0.64938,0.48357,0.34139,2.49204,9.98,World Happiness 2015,7.527
Norway,Western Europe,4,7.522,1.459,1.33095,0.88521,0.66973,0.36503,0.34699,2.46531,10.97,World Happiness 2015,7.522
Canada,North America,5,7.427,1.32629,1.32261,0.90563,0.63297,0.32957,0.45811,2.45176,9.85,World Happiness 2015,7.427


In [0]:
# Identify nulls in each column
from pyspark.sql.functions import col
print("Null counts by column:")
for c in df_transformed.columns:
    n_null = df_transformed.filter(col(c).isNull()).count()
    print(f"{c}: {n_null}")
# Fill or drop nulls (if any)
df_clean = df_transformed.na.fill({
    "Country": "",
    "Region": "",
    "Happiness_Score": 0.0,
    "Economy_GDP_per_Capita": 0.0,
    "Family": 0.0,
    "Health (Life Expectancy)": 0.0,
    "Freedom": 0.0,
    "Trust (Government Corruption)": 0.0,
    "Generosity": 0.0,
    "Dystopia Residual": 0.0,
    "GDP_Happiness": 0.0,
    "dataset_name": "",
    "Happiness_Score_Double": 0.0
})

Null counts by column:
Country: 0
Region: 0
Happiness Rank: 0
Happiness_Score: 0
Economy_GDP_per_Capita: 0
Family: 0
Health (Life Expectancy): 0
Freedom: 0
Trust (Government Corruption): 0
Generosity: 0
Dystopia Residual: 0
GDP_Happiness: 0
dataset_name: 0
Happiness_Score_Double: 0


In [0]:
print("Row count before removing duplicates:", df_clean.count())
df_duplicates = df_clean.exceptAll(df_clean.dropDuplicates())
print("Duplicate records count:", df_duplicates.count())
display(df_duplicates)
df_nodup = df_clean.dropDuplicates()
print("After removing duplicates:", df_nodup.count())


Row count before removing duplicates: 93
Duplicate records count: 0


Country,Region,Happiness Rank,Happiness_Score,Economy_GDP_per_Capita,Family,Health (Life Expectancy),Freedom,Trust (Government Corruption),Generosity,Dystopia Residual,GDP_Happiness,dataset_name,Happiness_Score_Double


After removing duplicates: 93


In [0]:
output_path = "/Volumes/workspace/assignment/volume/happiness_2015_cleaned_parquet"

df_nodup.write.mode("overwrite").parquet(output_path)

print(f"Processed data saved to: {output_path}")

# Display sample data from the saved parquet file
df_sample = spark.read.parquet(output_path).limit(5)
display(df_sample)

Processed data saved to: /Volumes/workspace/assignment/volume/happiness_2015_cleaned_parquet


Country,Region,Happiness Rank,Happiness_Score,Economy_GDP_per_Capita,Family,Health (Life Expectancy),Freedom,Trust (Government Corruption),Generosity,Dystopia Residual,GDP_Happiness,dataset_name,Happiness_Score_Double
Somaliland region,Sub-Saharan Africa,91,5.057,0.18847,0.95152,0.43873,0.46582,0.39928,0.50318,2.11032,0.95,World Happiness 2015,5.057
Trinidad and Tobago,Latin America and Caribbean,41,6.168,1.21183,1.18354,0.61483,0.55884,0.0114,0.31844,2.26882,7.47,World Happiness 2015,6.168
Saudi Arabia,Middle East and Northern Africa,35,6.411,1.39541,1.08393,0.72025,0.31048,0.32524,0.13706,2.43872,8.95,World Happiness 2015,6.411
France,Western Europe,29,6.575,1.27778,1.26038,0.94579,0.55011,0.20646,0.12332,2.21126,8.4,World Happiness 2015,6.575
Algeria,Middle East and Northern Africa,68,5.605,0.93929,1.07772,0.61766,0.28579,0.17383,0.07822,2.43209,5.26,World Happiness 2015,5.605
