In [None]:
from pyspark.sql import SparkSession

# Step 1: Create SparkSession
spark = SparkSession.builder \
    .appName("CSV to Parquet") \
    .getOrCreate()

# Step 2: Read CSV file
# Use the path from the kagglehub download
csv_file_path = "/content/drive/MyDrive/climate_dataset/GlobalLandTemperaturesByCity.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Step 3: Repartition the DataFrame (e.g., into 4 partitions)
df = df.repartition(4)

# Step 4: Write to Parquet (as a single file but partitioned into chunks)
df.write \
    .mode("overwrite") \
    .parquet("output_folder")

In [None]:
from pyspark.sql import SparkSession

# Step 1: Create SparkSession (if not already created)
spark = SparkSession.builder \
    .appName("Read Parquet File") \
    .getOrCreate()

# Step 2: Read the Parquet data
df_parquet = spark.read.parquet("output_folder")

# Step 3: Show the DataFrame
df_parquet.show()


+----------+------------------+-----------------------------+-----------------+--------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|             City|       Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----------------+--------------+--------+---------+
|1848-09-01|            27.435|                        1.042|          Barisal|    Bangladesh|  23.31N|   90.00E|
|1838-10-01|              NULL|                         NULL|         Cikampek|     Indonesia|   7.23S|  107.84E|
|1994-03-01|            19.507|                         1.12|         Buraydah|  Saudi Arabia|  26.52N|   44.78E|
|1888-11-01|            25.916|          0.46299999999999997|         Carúpano|     Venezuela|  10.45N|   63.00W|
|1857-05-01|            26.017|                        1.193|          Cotonou|         Benin|   7.23N|    2.43E|
|1890-06-01|            23.257|                        1.141|       Cape Coast|         

In [None]:
# sort by AverageTemperature ignore where NULL
df_parquet = df_parquet.sort("AverageTemperature")
df_parquet.show()

+----------+------------------+-----------------------------+--------------+-------------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|          City|      Country|Latitude|Longitude|
+----------+------------------+-----------------------------+--------------+-------------+--------+---------+
|1778-07-01|              NULL|                         NULL|       Detroit|United States|  42.59N|   82.91W|
|1814-05-01|              NULL|                         NULL|         Bidar|        India|  18.48N|   77.75E|
|1750-11-01|              NULL|                         NULL|        Almere|  Netherlands|  52.24N|    5.26E|
|1755-07-01|              NULL|                         NULL|    Des Moines|United States|  40.99N|   93.73W|
|1890-07-01|              NULL|                         NULL|         Cusco|         Peru|  13.66S|   71.83W|
|1747-06-01|              NULL|                         NULL|   Chattanooga|United States|  34.56N|   85.62W|
|1824-08-0

In [None]:
# remove all the null values
null_df = df_parquet.na.drop()
null_df.show()

+----------+-------------------+-----------------------------+----------+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|      City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+----------+-------+--------+---------+
|1979-02-01| -42.70399999999999|                        0.972|   Norilsk| Russia|  69.92N|   88.83E|
|1893-01-01|-41.101000000000006|                          1.5|     Kyzyl| Russia|  52.24N|   94.60E|
|1966-02-01|-39.919000000000004|                        0.968|   Norilsk| Russia|  69.92N|   88.83E|
|1974-01-01|            -39.683|                        1.298|   Norilsk| Russia|  69.92N|   88.83E|
|1979-01-01|            -39.403|                        0.387|   Norilsk| Russia|  69.92N|   88.83E|
|1919-01-01|-39.038000000000004|                          1.5|     Kyzyl| Russia|  52.24N|   94.60E|
|1872-01-01|            -38.951|                        6.653|     Kyzyl| Russia|  52.24N| 

In [None]:
# get total count
null_df.count()

8235082

In [None]:
null_df.sort("AverageTemperature").show(15)

+----------+-------------------+-----------------------------+-------+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|   City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-------+-------+--------+---------+
|1979-02-01| -42.70399999999999|                        0.972|Norilsk| Russia|  69.92N|   88.83E|
|1893-01-01|-41.101000000000006|                          1.5|  Kyzyl| Russia|  52.24N|   94.60E|
|1966-02-01|-39.919000000000004|                        0.968|Norilsk| Russia|  69.92N|   88.83E|
|1974-01-01|            -39.683|                        1.298|Norilsk| Russia|  69.92N|   88.83E|
|1979-01-01|            -39.403|                        0.387|Norilsk| Russia|  69.92N|   88.83E|
|1919-01-01|-39.038000000000004|                          1.5|  Kyzyl| Russia|  52.24N|   94.60E|
|1872-01-01|            -38.951|                        6.653|  Kyzyl| Russia|  52.24N|   94.60E|
|1969-01-01|        

In [None]:
from pyspark.sql.functions import col

# Show extremely low or high temperatures
null_df.filter((col("AverageTemperature") < -50) | (col("AverageTemperature") > 50)).show()


+---+------------------+-----------------------------+----+-------+--------+---------+
| dt|AverageTemperature|AverageTemperatureUncertainty|City|Country|Latitude|Longitude|
+---+------------------+-----------------------------+----+-------+--------+---------+
+---+------------------+-----------------------------+----+-------+--------+---------+



In [None]:
from pyspark.sql.functions import mean, stddev, col
import matplotlib.pyplot as plt
import pandas as pd

# -----------------------------------------
# STEP 1: Filter for Pakistan
# -----------------------------------------
pakistan_df = null_df.filter(col("Country") == "Pakistan")

# -----------------------------------------
# STEP 2: Calculate Mean and Std Deviation (for Pakistan only)
# -----------------------------------------
stats = pakistan_df.select(
    mean("AverageTemperature").alias("mean_temp"),
    stddev("AverageTemperature").alias("stddev_temp")
).first()

mean_temp = stats["mean_temp"]
stddev_temp = stats["stddev_temp"]

print(f"[PAKISTAN] Mean Temp: {mean_temp:.2f}, Std Dev: {stddev_temp:.2f}")

# -----------------------------------------
# STEP 3: Add z_score Column
# -----------------------------------------
pakistan_df_with_z = pakistan_df.withColumn(
    "z_score",
    (col("AverageTemperature") - mean_temp) / stddev_temp
)

# -----------------------------------------
# STEP 4: Filter Anomalies
# -----------------------------------------
anomalies_df = pakistan_df_with_z.filter((col("z_score") > 3) | (col("z_score") < -3))

# -----------------------------------------
# STEP 5: Convert to Pandas (safe limit)
# -----------------------------------------
anomalies_pd = anomalies_df.select(
    "dt", "AverageTemperature", "z_score", "City", "Country"
).orderBy(col("z_score").desc()).limit(500).toPandas()

anomalies_pd["dt"] = pd.to_datetime(anomalies_pd["dt"])

# -----------------------------------------
# STEP 6: Plot Anomalies for Pakistan
# -----------------------------------------
plt.figure(figsize=(12, 6))
plt.scatter(anomalies_pd["dt"], anomalies_pd["AverageTemperature"], color='red', label='Anomalies')
plt.axhline(mean_temp, color='green', linestyle='--', label='Mean Temperature (Pakistan)')

plt.title("Temperature Anomalies in Pakistan (Z-score > 3 or < -3)")
plt.xlabel("Date")
plt.ylabel("Average Temperature (°C)")
plt.xticks(rotation=45)
plt.legend()
plt.tight_layout()
plt.show()


In [None]:
import gradio as gr
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean as _mean, stddev as _stddev
import matplotlib.pyplot as plt
import pandas as pd

# Initialize Spark
spark = SparkSession.builder \
    .appName("Temperature Anomalies") \
    .getOrCreate()

# Load your dataset (replace with your path if different)
df = spark.read.parquet("output_folder")

# Get distinct countries for dropdown
countries = [row['Country'] for row in df.select("Country").distinct().collect()]

# Function to calculate and plot anomalies
def plot_anomalies(country):
    # Filter for selected country
    country_df = df.filter(col("Country") == country)

    # Calculate mean and stddev
    stats = country_df.select(
        _mean("AverageTemperature").alias("mean"),
        _stddev("AverageTemperature").alias("std")
    ).collect()[0]
    mean_val, std_val = stats["mean"], stats["std"]

    # Calculate anomaly (z-score)
    anomalies_df = country_df.withColumn("Anomaly", (col("AverageTemperature") - mean_val) / std_val)

    # Convert to Pandas
    pdf = anomalies_df.select("dt", "Anomaly").orderBy("dt").toPandas()
    pdf['dt'] = pd.to_datetime(pdf['dt'])

    # Plot
    plt.figure(figsize=(10, 4))
    plt.plot(pdf['dt'], pdf['Anomaly'], label='Anomaly (z-score)', color='purple')
    plt.axhline(0, color='black', linestyle='--', linewidth=1)
    plt.title(f'Temperature Anomalies for {country}')
    plt.xlabel('Date')
    plt.ylabel('Anomaly (Z-Score)')
    plt.tight_layout()

    # Save plot to image
    plot_path = "/tmp/anomaly_plot.png"
    plt.savefig(plot_path)
    plt.close()
    return plot_path

# Gradio UI
interface = gr.Interface(
    fn=plot_anomalies,
    inputs=gr.Dropdown(choices=countries, label="Select Country"),
    outputs=gr.Image(type="filepath"),
    title="Temperature Anomaly Viewer",
    description="Select a country to view its temperature anomalies over time (Z-score)"
)

# Launch
interface.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://54c2bd6d42ccc5d421.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:
# Imports
from pyspark.sql.functions import month, year, avg
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import gradio as gr

# Step 0: Spark session (only needed if not already created)
spark = SparkSession.builder.appName("ClimatePatternApp").getOrCreate()

# Step 1: Load and clean data (replace path as needed)
df = spark.read.parquet("output_folder")  # Adjust file path
df = df.na.drop()

# Step 2: Add month and year
df = df.withColumn("month", month("dt")).withColumn("year", year("dt"))

# Step 3: Monthly and Yearly Patterns
monthly_pattern_df = df.groupBy("Country", "month") \
                       .agg(avg("AverageTemperature").alias("AvgTemp")) \
                       .orderBy("Country", "month")

yearly_trend_df = df.groupBy("Country", "year") \
                    .agg(avg("AverageTemperature").alias("YearlyAvgTemp")) \
                    .orderBy("Country", "year")

# Step 4: Anomaly Detection (IQR method per country)
pdf = df.select("dt", "AverageTemperature", "Country").toPandas()
anomalies = []

for country in pdf["Country"].unique():
    subset = pdf[pdf["Country"] == country]
    Q1 = subset["AverageTemperature"].quantile(0.25)
    Q3 = subset["AverageTemperature"].quantile(0.75)
    IQR = Q3 - Q1
    lower = Q1 - 1.5 * IQR
    upper = Q3 + 1.5 * IQR
    anom = subset[(subset["AverageTemperature"] < lower) | (subset["AverageTemperature"] > upper)]
    anomalies.append(anom)

anomalies_pd = pd.concat(anomalies)

# Step 5: Convert patterns to Pandas
monthly_pattern_pd = monthly_pattern_df.toPandas()
yearly_trend_pd = yearly_trend_df.toPandas()

# Step 6: Gradio Function
def show_patterns(country):
    fig, axes = plt.subplots(1, 3, figsize=(21, 5))

    # Monthly Pattern
    monthly = monthly_pattern_pd[monthly_pattern_pd["Country"] == country]
    axes[0].plot(monthly["month"], monthly["AvgTemp"], marker='o')
    axes[0].set_title(f"Monthly Climate Pattern - {country}")
    axes[0].set_xlabel("Month")
    axes[0].set_ylabel("Avg Temp (°C)")
    axes[0].grid(True)

    # Yearly Trend
    yearly = yearly_trend_pd[yearly_trend_pd["Country"] == country]
    axes[1].plot(yearly["year"], yearly["YearlyAvgTemp"], marker='x', color='orange')
    axes[1].set_title(f"Yearly Temp Trend - {country}")
    axes[1].set_xlabel("Year")
    axes[1].set_ylabel("Avg Temp (°C)")
    axes[1].grid(True)

    # Anomalies
    anom = anomalies_pd[anomalies_pd["Country"] == country]
    axes[2].plot(anom["dt"], anom["AverageTemperature"], 'ro', markersize=4)
    axes[2].set_title(f"Detected Anomalies - {country}")
    axes[2].set_xlabel("Date")
    axes[2].set_ylabel("Anomalous Temp (°C)")
    axes[2].grid(True)

    plt.tight_layout()
    return fig

# Step 7: Launch Gradio App
country_list = sorted(monthly_pattern_pd["Country"].unique())

gr.Interface(fn=show_patterns,
             inputs=gr.Dropdown(choices=country_list, label="Select Country"),
             outputs=gr.Plot(label="Climate Insights"),
             title="🌍 Climate Anomalies & Patterns Visualizer",
             description="Select a country to view monthly weather pattern, yearly trends, and temperature anomalies.").launch()


Py4JJavaError: An error occurred while calling o528.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 119.0 failed 1 times, most recent failure: Lost task 0.0 in stage 119.0 (TID 183) (fbd16ade332a executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.OutOfMemoryError: Java heap space
