In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
spark = SparkSession.builder.appName("maxTemp-df").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/05 20:36:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType([ \
    StructField("stationID", StringType(), True), \
    StructField("date", IntegerType(), True), \
    StructField("measure_type", StringType(), True), \
    StructField("temperature", FloatType(), True)])

In [4]:
# Read the file as dataframe
df = spark.read.schema(schema).csv("1800.csv")
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



In [5]:
# Filter out all but TMAX entries
maxTemps = df.filter(df.measure_type == "TMAX")

In [6]:
# Select only stationID and temperature
stationTemps = maxTemps.select("stationID", "temperature")

In [7]:
# Aggregate to find maximum temperature for every station
maxTempsByStation = stationTemps.groupBy("stationID").max("temperature")
maxTempsByStation.show()

+-----------+----------------+
|  stationID|max(temperature)|
+-----------+----------------+
|ITE00100554|           323.0|
|EZE00100082|           323.0|
+-----------+----------------+



In [8]:
# Convert temperature to Fahrenheit and sort the dataset
maxTempsByStationF = maxTempsByStation.withColumn("temperature",
                                                  func.round(func.col("max(temperature)") * 0.1 * (9.0 / 5.0) + 32.0,
                                                             2)) \
    .select("stationID", "temperature").sort("temperature")

In [9]:
# Collect, format, and print the results
results = maxTempsByStationF.collect()

In [10]:
for result in results:
    print("The maximum result is: " + result[0] + "\t{:.2f}F".format(result[1]))

spark.stop()

The maximum result is: ITE00100554	90.14F
The maximum result is: EZE00100082	90.14F
