In [6]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, when
import kagglehub, glob, os, shutil

path = kagglehub.dataset_download("jsphyg/weather-dataset-rattle-package")
csv_path = f"{path}/weatherAUS.csv"

# Stop any existing Spark session before creating a new one
spark = SparkSession.builder.appName("lab9").config("spark.driver.memory", "4g").getOrCreate()

df = spark.read.csv(csv_path, header=True, inferSchema=False)

for c in df.columns:
    df = df.withColumn(c, when((col(c).isin("NA", "NaN", "", "None")), None).otherwise(col(c)))

num_cols = [
    "MinTemp", "MaxTemp", "Rainfall", "Evaporation", "Sunshine",
    "WindGustSpeed", "WindSpeed9am", "WindSpeed3pm",
    "Humidity9am", "Humidity3pm", "Pressure9am", "Pressure3pm",
    "Cloud9am", "Cloud3pm", "Temp9am", "Temp3pm"
]
for c in num_cols:
    df = df.withColumn(c, col(c).cast("float"))

df = df.filter(col("RainTomorrow").isNotNull())

for c in num_cols:
    median = df.approxQuantile(c, [0.5], 0.01)[0]
    df = df.na.fill({c: median})

cat_cols = [c for c, t in df.dtypes if t == "string" and c not in ["RainTomorrow", "Date"]]
for c in cat_cols:
    mode = df.groupBy(c).count().orderBy(F.desc("count")).first()[0]
    df = df.na.fill({c: mode})

df.show(5)
df.printSchema()

Using Colab cache for faster access to the 'weather-dataset-rattle-package' dataset.


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [3]:
pip install pyspark



In [None]:
for c in ["Temp3pm", "Humidity3pm", "Rainfall"]:
    df = df.withColumn(c, col(c).cast("float"))

df.select("Date", "Location", "Temp3pm").show(5)

print("температура в 3PM > 30°C")
df.filter(col("Temp3pm") > 30).show(5)

print("сортировка по Temp3pm по убыванию")
df.orderBy(col("Temp3pm").desc()).show(5)

df.describe(["Temp3pm", "Humidity3pm", "Rainfall"]).show()

+----------+--------+-------+
|      Date|Location|Temp3pm|
+----------+--------+-------+
|2008-12-01|  Albury|   21.8|
|2008-12-02|  Albury|   24.3|
|2008-12-03|  Albury|   23.2|
|2008-12-04|  Albury|   26.5|
|2008-12-05|  Albury|   29.7|
+----------+--------+-------+
only showing top 5 rows

температура в 3PM > 30°C
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+-----------+--------+--------+-------+-------+---------+------------+
|      Date|Location|MinTemp|MaxTemp|Rainfall|Evaporation|Sunshine|WindGustDir|WindGustSpeed|WindDir9am|WindDir3pm|WindSpeed9am|WindSpeed3pm|Humidity9am|Humidity3pm|Pressure9am|Pressure3pm|Cloud9am|Cloud3pm|Temp9am|Temp3pm|RainToday|RainTomorrow|
+----------+--------+-------+-------+--------+-----------+--------+-----------+-------------+----------+----------+------------+------------+-----------+-----------+-----------+---------

In [None]:
from google.colab import files;
output_path = "output/lab9"
df.filter(col("Temp3pm") > 30).write.csv(output_path, header=True, mode="overwrite")

final_file = f"{output_path}.csv"
parts = sorted(glob.glob(f"{output_path}/part-*.csv"))
with open(final_file, "w", encoding="utf-8") as out:
    for i, f in enumerate(parts):
        lines = open(f, encoding="utf-8").readlines()
        out.writelines(lines if i == 0 else lines[1:])
try:
    files.download(final_file)
except: pass

spark.stop()

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>