In [97]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, DataFrame

In [57]:
spark = SparkSession.builder.appName("weather_EDA").getOrCreate()

In [58]:
spark

In [59]:
PATH = 'weather_data.csv'

In [60]:
df = spark.read.csv(PATH, header='True', inferSchema='True')

In [61]:
df.show()

+------------+-------------------+-------------------+------------------+-------------------+------------------+
|    Location|          Date_Time|      Temperature_C|      Humidity_pct|   Precipitation_mm|    Wind_Speed_kmh|
+------------+-------------------+-------------------+------------------+-------------------+------------------+
|   San Diego|2024-01-14 21:12:46| 10.683001094715387|41.195753566944475|   4.02011871570867| 8.233540246873023|
|   San Diego|2024-05-17 15:22:10|  8.734139782353598| 58.31910739552024|  9.111623448229377| 27.71516125689249|
|   San Diego|2024-05-11 09:30:59| 11.632436312930885| 38.82017526915946|  4.607511377146035|28.732951288236187|
|Philadelphia|2024-02-26 17:32:39|  -8.62897589569391| 54.07447397596174|  3.183719747807655|26.367302672536635|
| San Antonio|2024-04-29 13:23:51| 39.808212974631594| 72.89990795294305|  9.598282136749663|29.898621669296094|
|   San Diego|2024-01-21 08:54:56| 27.341054869123994| 49.02323606834762|  9.166543302732745| 27

In [62]:
df = df.withColumn('Date_Time', F.to_timestamp(df['Date_Time']))

In [63]:
df.show()

+------------+-------------------+-------------------+------------------+-------------------+------------------+
|    Location|          Date_Time|      Temperature_C|      Humidity_pct|   Precipitation_mm|    Wind_Speed_kmh|
+------------+-------------------+-------------------+------------------+-------------------+------------------+
|   San Diego|2024-01-14 21:12:46| 10.683001094715387|41.195753566944475|   4.02011871570867| 8.233540246873023|
|   San Diego|2024-05-17 15:22:10|  8.734139782353598| 58.31910739552024|  9.111623448229377| 27.71516125689249|
|   San Diego|2024-05-11 09:30:59| 11.632436312930885| 38.82017526915946|  4.607511377146035|28.732951288236187|
|Philadelphia|2024-02-26 17:32:39|  -8.62897589569391| 54.07447397596174|  3.183719747807655|26.367302672536635|
| San Antonio|2024-04-29 13:23:51| 39.808212974631594| 72.89990795294305|  9.598282136749663|29.898621669296094|
|   San Diego|2024-01-21 08:54:56| 27.341054869123994| 49.02323606834762|  9.166543302732745| 27

In [64]:
df.printSchema()

root
 |-- Location: string (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_pct: double (nullable = true)
 |-- Precipitation_mm: double (nullable = true)
 |-- Wind_Speed_kmh: double (nullable = true)



In [65]:
# Q1) “Visualize the table (head), schema, and summary stats.”
df.select('Temperature_c', 'Humidity_pct', 'Precipitation_mm', 'Wind_speed_kmh').summary().show()

+-------+-------------------+------------------+--------------------+--------------------+
|summary|      Temperature_c|      Humidity_pct|    Precipitation_mm|      Wind_speed_kmh|
+-------+-------------------+------------------+--------------------+--------------------+
|  count|            1000000|           1000000|             1000000|             1000000|
|   mean| 14.779704927041724|60.021829555539995|   5.109639125976339|  14.997598129281656|
| stddev| 14.482558096887278|17.324022036939365|  2.9479970526543395|   8.663556016588863|
|    min|-19.969311093584516|30.000008666508258|8.904156590938683E-6|5.064808367705176E-5|
|    25%| 2.2683977659423906|45.007143615156224|  2.5803206828077787|   7.489410465672412|
|    50%|  14.77664534994316| 60.01618887921569|   5.109196226835624|  14.993297198467964|
|    75%|  27.26690781548175| 75.04043996513244|   7.613099814913298|  22.513586281793515|
|    max|  39.99980055990208| 89.99997706846489|  14.971582777942082|  29.999972925526304|

In [21]:
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

+--------+---------+-------------+------------+----------------+--------------+
|Location|Date_Time|Temperature_C|Humidity_pct|Precipitation_mm|Wind_Speed_kmh|
+--------+---------+-------------+------------+----------------+--------------+
|       0|        0|            0|           0|               0|             0|
+--------+---------+-------------+------------+----------------+--------------+



In [67]:
# Q2) “Change types that need to be changed; add handy time columns.”
df = (df
      .withColumn("date", F.to_date(df['Date_Time']))
      .withColumn("month", F.month(df['Date_Time']))
      .withColumn("hour", F.hour(df['Date_Time']))
      .withColumn("day_of_week", F.dayofweek(df['Date_Time']))
)

df.printSchema()

root
 |-- Location: string (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_pct: double (nullable = true)
 |-- Precipitation_mm: double (nullable = true)
 |-- Wind_Speed_kmh: double (nullable = true)
 |-- date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)



In [68]:
df.show()

+------------+-------------------+-------------------+------------------+-------------------+------------------+----------+-----+----+-----------+
|    Location|          Date_Time|      Temperature_C|      Humidity_pct|   Precipitation_mm|    Wind_Speed_kmh|      date|month|hour|day_of_week|
+------------+-------------------+-------------------+------------------+-------------------+------------------+----------+-----+----+-----------+
|   San Diego|2024-01-14 21:12:46| 10.683001094715387|41.195753566944475|   4.02011871570867| 8.233540246873023|2024-01-14|    1|  21|          1|
|   San Diego|2024-05-17 15:22:10|  8.734139782353598| 58.31910739552024|  9.111623448229377| 27.71516125689249|2024-05-17|    5|  15|          6|
|   San Diego|2024-05-11 09:30:59| 11.632436312930885| 38.82017526915946|  4.607511377146035|28.732951288236187|2024-05-11|    5|   9|          7|
|Philadelphia|2024-02-26 17:32:39|  -8.62897589569391| 54.07447397596174|  3.183719747807655|26.367302672536635|2024-0

In [69]:
# Q3) “Which locations have the highest max temperature?”
hottest = (
    df.groupby("Location")
    .agg(F.max("Temperature_c").alias("max_temperature"))
    .orderBy(F.col("max_temperature").desc())
).show(5)

+------------+------------------+
|    Location|   max_temperature|
+------------+------------------+
|    New York| 39.99980055990208|
|   San Diego| 39.99969202638841|
|Philadelphia| 39.99964200636102|
| Los Angeles|39.999592117635316|
|    San Jose|39.999014854952236|
+------------+------------------+
only showing top 5 rows


In [70]:
# Q4) “Top 3 locations by average temperature."
top_3 = (df
         .groupby("Location")
         .agg(F.round(F.avg("Temperature_c"), 2).alias("avg_temperature"))
         .orderBy(F.col("avg_temperature").desc())
).show(3)

+------------+---------------+
|    Location|avg_temperature|
+------------+---------------+
| Los Angeles|          15.08|
|Philadelphia|          15.04|
| San Antonio|          15.03|
+------------+---------------+
only showing top 3 rows


In [71]:
# Q5) “Wettest locations (total precipitation) and the single wettest day per location.”

wettest_total = (df
                 .groupBy("Location")
                 .agg(F.round(F.sum("precipitation_mm"), 2).alias("total_precip_mm"))
                 .orderBy(F.col("total_precip_mm").desc())
                 ).show(10)

+------------+---------------+
|    Location|total_precip_mm|
+------------+---------------+
|     Phoenix|      610020.81|
|Philadelphia|       501467.1|
|     Chicago|      501136.93|
| San Antonio|      500618.82|
|     Houston|      500155.54|
| Los Angeles|      500058.77|
|   San Diego|      499527.82|
|      Dallas|      499430.93|
|    New York|      499165.85|
|    San Jose|      498056.57|
+------------+---------------+



In [72]:
from pyspark.sql.window import Window

daily_precip =(df
               .groupBy("Location", "date")
               .agg(F.sum("precipitation_mm").alias("daily_precip_mm"))
               )

window_spec = Window.partitionBy("Location").orderBy(F.desc("daily_precip_mm"), F.asc("date"))

wettest_day_per_location = (
    daily_precip
    .withColumn("rn", F.row_number().over(window_spec))
    .filter(F.col("rn")==F.lit(1))
    .drop("rn")
    .orderBy(F.desc("daily_precip_mm"))
).show(10)

+------------+----------+------------------+
|    Location|      date|   daily_precip_mm|
+------------+----------+------------------+
|     Phoenix|2024-01-21| 6012.865234952282|
| San Antonio|2024-03-18|4087.2380711875057|
|     Chicago|2024-02-28|4020.8120726941524|
| Los Angeles|2024-04-01|4012.5339813462756|
|    San Jose|2024-04-16| 3986.173931330937|
|Philadelphia|2024-01-21|3971.9457953615924|
|   San Diego|2024-03-10|3968.9530238037664|
|      Dallas|2024-04-23| 3956.009131037221|
|     Houston|2024-01-28|  3920.81936818571|
|    New York|2024-02-29|3900.2112073649187|
+------------+----------+------------------+



In [73]:
# Q6) “Windiest location on average, and the windiest hourly record overall.”

df.show(10)


+------------+-------------------+-------------------+------------------+------------------+------------------+----------+-----+----+-----------+
|    Location|          Date_Time|      Temperature_C|      Humidity_pct|  Precipitation_mm|    Wind_Speed_kmh|      date|month|hour|day_of_week|
+------------+-------------------+-------------------+------------------+------------------+------------------+----------+-----+----+-----------+
|   San Diego|2024-01-14 21:12:46| 10.683001094715387|41.195753566944475|  4.02011871570867| 8.233540246873023|2024-01-14|    1|  21|          1|
|   San Diego|2024-05-17 15:22:10|  8.734139782353598| 58.31910739552024| 9.111623448229377| 27.71516125689249|2024-05-17|    5|  15|          6|
|   San Diego|2024-05-11 09:30:59| 11.632436312930885| 38.82017526915946| 4.607511377146035|28.732951288236187|2024-05-11|    5|   9|          7|
|Philadelphia|2024-02-26 17:32:39|  -8.62897589569391| 54.07447397596174| 3.183719747807655|26.367302672536635|2024-02-26|  

In [48]:
windiest_avg= (
    df
    .groupBy("Location")
    .agg(F.avg("Wind_speed_kmh").alias("avg_wind_speed_kmh"))
    .orderBy(F.col("avg_wind_speed_kmh").desc())
).show(10)

+------------+------------------+
|    Location|avg_wind_speed_kmh|
+------------+------------------+
|   San Diego|15.037633062690071|
| Los Angeles|15.035120312286233|
|      Dallas|15.022001476205851|
|     Chicago|14.999500905388548|
|Philadelphia| 14.99066287575462|
|     Houston|14.989935004712333|
|    New York|14.989397750499128|
|     Phoenix|14.983903806245223|
|    San Jose|14.971368507043172|
| San Antonio|14.956578923965553|
+------------+------------------+



In [74]:
# Q7) “For each location, return the top 3 hottest timestamps (window function).”

w = Window.partitionBy("Location").orderBy(F.desc("Temperature_C"), F.asc("Date_Time"))

top3_per_loc = (df
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") <= 3)
    .select("Location","Date_Time","Temperature_C","Humidity_pct","Wind_Speed_kmh"))
top3_per_loc.show(50, truncate=False)


+------------+-------------------+------------------+------------------+------------------+
|Location    |Date_Time          |Temperature_C     |Humidity_pct      |Wind_Speed_kmh    |
+------------+-------------------+------------------+------------------+------------------+
|Chicago     |2024-05-15 00:45:03|39.99856055186906 |33.44063088469898 |3.581761385677761 |
|Chicago     |2024-02-14 09:49:33|39.9981996841916  |78.38402298838685 |23.151472721152206|
|Chicago     |2024-02-07 10:04:49|39.997948370615816|73.12906264395096 |14.70713063078171 |
|Dallas      |2024-04-19 04:49:15|39.998804370831856|54.77511680323569 |8.557635883230544 |
|Dallas      |2024-02-10 06:53:08|39.998210641102844|37.64381505569977 |29.232354446751664|
|Dallas      |2024-04-13 23:50:34|39.99807780154744 |39.455773869151116|28.78312814110111 |
|Houston     |2024-04-16 22:26:42|39.998913308230456|57.93976392669023 |25.132386741681493|
|Houston     |2024-02-05 15:16:56|39.998692637847334|62.13106619341958 |5.584580

In [75]:
# Q8) “Daily average temperature per location and 7‑day rolling average.”

daily_avg_temp_df = (
    df.groupBy("Location")
    .agg(F.avg("Temperature_C").alias("avg_temperature"))
    .orderBy(F.col("avg_temperature").desc())
).show(10)

+------------+------------------+
|    Location|   avg_temperature|
+------------+------------------+
| Los Angeles|15.081062749080642|
|Philadelphia|15.035891898863467|
| San Antonio|15.027760596677592|
|    New York|15.013389431407829|
|     Chicago|15.009207643653685|
|      Dallas|15.009097007201921|
|    San Jose| 14.95610795367727|
|     Houston|14.942985042361983|
|   San Diego|14.933538709596466|
|     Phoenix|12.792479413115531|
+------------+------------------+



In [76]:
w7 = Window.partitionBy("Location").orderBy(F.col("date").asc()).rowsBetween(-6, 0)
daily_rolling =(
    df.withColumn("avg_temp_7d", F.round(F.avg("Temperature_c").over(w7), 2))
    .orderBy("Location", "date")
).show(30)

+--------+-------------------+-------------------+------------------+-------------------+-------------------+----------+-----+----+-----------+-----------+
|Location|          Date_Time|      Temperature_C|      Humidity_pct|   Precipitation_mm|     Wind_Speed_kmh|      date|month|hour|day_of_week|avg_temp_7d|
+--------+-------------------+-------------------+------------------+-------------------+-------------------+----------+-----+----+-----------+-----------+
| Chicago|2024-01-01 00:23:03|-7.6240068336863605| 43.68800604072399|  6.604117218680262| 28.139150937643645|2024-01-01|    1|   0|          2|      -7.62|
| Chicago|2024-01-01 00:28:55|  8.603506121873316| 74.85640542665395|  9.188582092208438|0.21784559567478712|2024-01-01|    1|   0|          2|       0.49|
| Chicago|2024-01-01 07:09:01|  21.18080588303704| 41.69161361763747| 1.3070304107404696|  24.91595127981203|2024-01-01|    1|   7|          2|       7.39|
| Chicago|2024-01-01 08:05:49|  18.94098997956236| 48.2112899865

In [78]:
# Q9) “Monthly precipitation by location in a pivoted table (months as columns).”

monthly_precip = (
    df.groupBy("Location", "month")
    .agg(F.round(F.sum("Precipitation_mm"), 2).alias("total_precip_mm"))
)

pivoted = (
    monthly_precip
    .groupBy("Location")
    .pivot("month", list(range(1, 13)))
    .agg(F.first("total_precip_mm"))
).show(10)

+------------+---------+---------+---------+---------+--------+----+----+----+----+----+----+----+
|    Location|        1|        2|        3|        4|       5|   6|   7|   8|   9|  10|  11|  12|
+------------+---------+---------+---------+---------+--------+----+----+----+----+----+----+----+
|     Phoenix| 169451.8|157881.39|112039.44|106864.44|63783.75|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|      Dallas|111488.25|104997.86|110494.98|108502.67|63947.16|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
| San Antonio|111654.14| 103873.7|111035.83|110044.88|64010.27|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|Philadelphia|112622.49| 102495.4|111976.29|109743.56|64629.36|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
| Los Angeles|111954.18|103976.57|110375.45|108558.87| 65193.7|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|   San Diego|111017.82|104246.23|111513.91|109370.88|63378.98|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|     Chicago|111942.41|103658.33| 112656.7|107304.81|65574.68|NULL|NULL|NULL|NULL|NULL|NULL|NULL|
|    San J

In [81]:
# Q10) “Convert °C to °F and bucket temperatures.”

df_with_f = df.withColumn("Temperature_F", F.round(F.col("Temperature_C")*9/5 +32, 2))


df_binned = df_with_f.withColumn(
    "temp_bucket",
    F.when(F.col("Temperature_c")<0, "below_freezing")
    .when(F.col("Temperature_c") < 10, "cold")
     .when(F.col("Temperature_c") < 20, "mild")
     .when(F.col("Temperature_c") < 30, "warm")
     .otherwise("hot")
)

In [82]:
df_binned.groupBy("Location","temp_bucket").count().orderBy("Location","temp_bucket").show(50, truncate=False)

+------------+--------------+-----+
|Location    |temp_bucket   |count|
+------------+--------------+-----+
|Chicago     |below_freezing|19961|
|Chicago     |cold          |20180|
|Chicago     |hot           |20089|
|Chicago     |mild          |19904|
|Chicago     |warm          |20030|
|Dallas      |below_freezing|20126|
|Dallas      |cold          |19845|
|Dallas      |hot           |19999|
|Dallas      |mild          |19789|
|Dallas      |warm          |20177|
|Houston     |below_freezing|20208|
|Houston     |cold          |19871|
|Houston     |hot           |19806|
|Houston     |mild          |20085|
|Houston     |warm          |20106|
|Los Angeles |below_freezing|19986|
|Los Angeles |cold          |19748|
|Los Angeles |hot           |20231|
|Los Angeles |mild          |19908|
|Los Angeles |warm          |20049|
|New York    |below_freezing|19811|
|New York    |cold          |20149|
|New York    |hot           |19841|
|New York    |mild          |19976|
|New York    |warm          

In [84]:
# cleaning outliers

df_with_f.show(10)


+------------+-------------------+-------------------+------------------+------------------+------------------+----------+-----+----+-----------+-------------+
|    Location|          Date_Time|      Temperature_C|      Humidity_pct|  Precipitation_mm|    Wind_Speed_kmh|      date|month|hour|day_of_week|Temperature_F|
+------------+-------------------+-------------------+------------------+------------------+------------------+----------+-----+----+-----------+-------------+
|   San Diego|2024-01-14 21:12:46| 10.683001094715387|41.195753566944475|  4.02011871570867| 8.233540246873023|2024-01-14|    1|  21|          1|        51.23|
|   San Diego|2024-05-17 15:22:10|  8.734139782353598| 58.31910739552024| 9.111623448229377| 27.71516125689249|2024-05-17|    5|  15|          6|        47.72|
|   San Diego|2024-05-11 09:30:59| 11.632436312930885| 38.82017526915946| 4.607511377146035|28.732951288236187|2024-05-11|    5|   9|          7|        52.94|
|Philadelphia|2024-02-26 17:32:39|  -8.6

In [85]:
df_with_f.printSchema()

root
 |-- Location: string (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Temperature_C: double (nullable = true)
 |-- Humidity_pct: double (nullable = true)
 |-- Precipitation_mm: double (nullable = true)
 |-- Wind_Speed_kmh: double (nullable = true)
 |-- date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- Temperature_F: double (nullable = true)



In [91]:
average_value_temp_c = df_with_f.agg(F.avg(F.col("Temperature_C"))).collect()[0][0]
average_value_humid = df_with_f.agg(F.avg(F.col("Humidity_pct"))).collect()[0][0]
average_value_precip = df_with_f.agg(F.avg(F.col("Precipitation_mm"))).collect()[0][0]
average_value_wind = df_with_f.agg(F.avg(F.col("Wind_Speed_kmh"))).collect()[0][0]
average_value_temp_f = df_with_f.agg(F.avg(F.col("Temperature_F"))).collect()[0][0]

In [92]:
df_with_f = df_with_f.fillna(average_value_temp_c, subset=["Temperature_C"])
df_with_f = df_with_f.fillna(average_value_humid, subset=["Humidity_pct"])
df_with_f = df_with_f.fillna(average_value_precip, subset=["Precipitation_mm"])
df_with_f = df_with_f.fillna(average_value_wind, subset=["Wind_Speed_kmh"])
df_with_f = df_with_f.fillna(average_value_temp_f, subset=["Temperature_F"])


In [94]:
df_with_f.printSchema()

root
 |-- Location: string (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Temperature_C: double (nullable = false)
 |-- Humidity_pct: double (nullable = false)
 |-- Precipitation_mm: double (nullable = false)
 |-- Wind_Speed_kmh: double (nullable = false)
 |-- date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- Temperature_F: double (nullable = false)



In [None]:
# there is an error with approxQuantile, i have to fix it

columns_to_detect = ["Temperature_C", "Humidity_pct", "Precipitation_mm", "Wind_Speed_kmh", "Temperature_F"]
bounds= {}

for col in columns_to_detect:
    quantiles = df_with_f.approxQuantile(col, [0.25, 0.75], 0)

    q1, q3 = quantiles[0], quantiles[1]

    iqr = q3-q1

    lb = q1 - 1.5*iqr
    ub = q3 + 1.5*iqr

    bounds[col] = (lb, ub)

lower_temp_c = bounds["Temperature_C"][0]
lower_humidity_pct = bounds["Humidity_pct"][0]
lower_precip_mm = bounds["Precipitation_mm"][0]
lower_wind_speed_kmh = bounds["Wind_Speed_kmh"][0]
lower_temp_f = bounds["Temperature_F"][0]

upper_temp_c = bounds["Temperature_C"][1]
upper_humidity_pct = bounds["Humidity_pct"][1]
upper_precip_mm = bounds["Precipitation_mm"][1]
upper_wind_speed_kmh = bounds["Wind_Speed_kmh"][1]
upper_temp_f = bounds["Temperature_F"][1]


df_with_f = (
    df_with_f.filter(
        (F.col("Temperature_C").between(lower_temp_c, upper_temp_c)) &
        (F.col("Humidity_pct").between(lower_humidity_pct, upper_humidity_pct)) &
        (F.col("Precipitation_mm").between(lower_precip_mm, upper_precip_mm)) &
        (F.col("Wind_Speed_kmh").between(lower_wind_speed_kmh, upper_wind_speed_kmh)) &
        (F.col("Temperature_F").between(lower_temp_f, upper_temp_f))
    )
)

df_with_f.show(10)