<a href="https://colab.research.google.com/github/JyotiChowrasia05/PySpark_Steephan/blob/main/PySpark_Steephan.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!pip install PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower, upper, lit, length, \
    to_timestamp, to_date, coalesce, regexp_replace, when, isnan, isnull
#from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, DateType, TimestampType

In [None]:
# Creating a Spark Session
spark = SparkSession.builder \
    .appName("MeteoriteApp") \
    .getOrCreate()

print("Spark Session Created Successfully!")

In [None]:
# Reading the CSV file
csv_file_path = "/content/meteorite_landings_raw.csv"

df = spark.read.csv(
    csv_file_path,
    header=True,
    inferSchema=True
)

print(f"\nDataFrame created from '{csv_file_path}'")

In [None]:
type(df)

In [None]:
df.show()

In [None]:
df.describe().show()

In [None]:
df.printSchema()

In [None]:
# Data cleaning

# Handle Missing Values (Nulls)
df = df.na.fill({"name": "na"})

# Removing Duplicate Rows (Entire Row Duplicates)
print(f"\nOriginal count: {df.count()}")
df = df.dropDuplicates()
print(f"Count after dropping full row duplicates: {df.count()}")

print("\n--- Final Cleaned DataFrame ---")
df.show(truncate=False)
df.printSchema()

In [None]:
df_checked = df.withColumn(
    "has_lead_trail_ws",
    (length(col("name")) != length(trim(col("name"))))
    .alias("has_lead_trail_ws")
)

print("\nDataFrame with a flag for leading/trailing whitespace:")
df_checked.show()

In [None]:
# Triming the whitr space from name column
df = df.withColumn("name", trim(col("name")))
print("\nDataFrame with 'name' column trimmed:")
df.show()

In [None]:
type(df['year'])

In [None]:
# convert Year column into a date type
date_formats = [
    "dd/MM/yyyy hh:mm:ss a",
    "M/d/yyyy H:mm"
]

parsed_date_exp = [to_date(col("year"), fmt) for fmt in date_formats]

# Use coalesce to pick the first non-null parsed date
df_converted = df.withColumn(
    "converted_date",
    coalesce(*parsed_date_exp) # The '*' unpacks the list into individual arguments
)

print("\nDataFrame with converted date column (handling multiple formats):")
df_converted.show(truncate=False)
df_converted.printSchema()


In [None]:
# Overwrite the geolocation column to null values where the data is given as (0.000000, 0.000000)
zero_coords_string = "(0.000000, 0.000000)"

df_geo = df.withColumn(
    "geolocation",
    when(
        (col("geolocation") == zero_coords_string),
         lit(None)
    ).otherwise(col("geolocation"))
)

print("\nDataFrame after nullifying specific geolocation strings:")
df_geo.show(truncate=False)
df_geo.printSchema()

In [None]:
# finding the distinct values in column 'fall'
df_distinct_fall = df.select("fall").distinct()
df_distinct_fall.show()

In [None]:
# Average mass of Meteors which were fell
df_fell = df.filter(
    (col("fall") == "Fell") &
    col("mass (g)").isNotNull() # Exclude null mass values from average
)

print("\n The records (where column 'fall' has value 'Fell' and column mass is not null):")
print('\n Total records:', df_fell.count())
df_fell.show()

avg_mass = df_fell.agg(avg("mass (g)").alias("average_mass_fell"))

print("\nAverage mass where 'fall' is 'Fell':")
avg_mass.show()

In [None]:
# Average mass of Meteors which were Found
df_found = df.filter(
    (col("fall") == "Found") &
    col("mass (g)").isNotNull() # Exclude null mass values from average
)

print("\n The records (where column 'fall' has value 'Found' and column mass is not null):")
print('\n Total records:', df_found.count())
df_found.show()

avg_mass_found = df_found.agg(avg("mass (g)").alias("average_mass_found"))

print("\nAverage mass where 'fall' is 'Found':")
avg_mass_found.show()

In [None]:
# marking flag_mass as True when the meteor mass is greater than the average mass

df_overall_avg_mass = df.filter(col("mass (g)").isNotNull()).agg(avg(col("mass (g)")).alias("overall_avg_mass"))
df_overall_avg_mass.show()

# Extracting the single average value from the 'df_overall_avg_mass' DataFrame
# Using .collect()[0][0] to get the scalar value
overall_avg_mass_value = df_overall_avg_mass.collect()[0]["overall_avg_mass"]
print(f"\nThe Overall Average Meteor Mass: {overall_avg_mass_value}")

df_with_overall_avg = df.withColumn(
    "overall_avg_mass",
    lit(overall_avg_mass_value) # lit() to add a constant value to all rows
)
print("\nDataFrame with the overall average mass column")
df_with_overall_avg.show()

df_result = df_with_overall_avg.withColumn(
    "flag_mass ",
    when(
        (col("mass (g)").isNotNull()) & (col("overall_avg_mass").isNotNull()) &
        (col("mass (g)") > col("overall_avg_mass")),
        lit(True)
    ).otherwise(lit(False)) # Set to False if not greater, or if mass/overall_avg is null
)

print("\nFinal DataFrame with 'is_mass_greater_than_overall_avg' column:")
df_result.show()
df_result.printSchema()

In [None]:
# Stop the SparkSession
# spark.stop()
# print("\nSparkSession stopped.")