In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark

In [22]:
from pyspark.sql import SparkSession, functions as func

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [6]:
spark = SparkSession.builder.appName("FindTemperature").getOrCreate()

2023-04-11 23:22:43,105 WARN util.Utils: Your hostname, shiva-life resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlo1)
2023-04-11 23:22:43,106 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-04-11 23:22:44,108 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
schema = StructType([
                     StructField("stationID", StringType(), True), 
                     StructField("date", IntegerType(), True),
                     StructField("measure_type", StringType(), True),
                     StructField("temperature", FloatType(), True)])

In [11]:
# // Read the file as dataframe
df = spark.read.schema(schema).csv("file:///home/bigdata/workspace/Spark_resources/1800.csv")

In [12]:
df.printSchema()

root
 |-- stationID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



In [13]:
# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

In [14]:
minTemps.show()



+-----------+--------+------------+-----------+
|  stationID|    date|measure_type|temperature|
+-----------+--------+------------+-----------+
|ITE00100554|18000101|        TMIN|     -148.0|
|EZE00100082|18000101|        TMIN|     -135.0|
|ITE00100554|18000102|        TMIN|     -125.0|
|EZE00100082|18000102|        TMIN|     -130.0|
|ITE00100554|18000103|        TMIN|      -46.0|
|EZE00100082|18000103|        TMIN|      -73.0|
|ITE00100554|18000104|        TMIN|      -13.0|
|EZE00100082|18000104|        TMIN|      -74.0|
|ITE00100554|18000105|        TMIN|       -6.0|
|EZE00100082|18000105|        TMIN|      -58.0|
|ITE00100554|18000106|        TMIN|       13.0|
|EZE00100082|18000106|        TMIN|      -57.0|
|ITE00100554|18000107|        TMIN|       10.0|
|EZE00100082|18000107|        TMIN|      -50.0|
|ITE00100554|18000108|        TMIN|       14.0|
|EZE00100082|18000108|        TMIN|      -31.0|
|ITE00100554|18000109|        TMIN|       23.0|
|EZE00100082|18000109|        TMIN|     

In [15]:
# Select only stationID and temperature
stationTemps = minTemps.select("stationID", "temperature")

In [16]:
stationTemps.show()

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
|EZE00100082|      -73.0|
|ITE00100554|      -13.0|
|EZE00100082|      -74.0|
|ITE00100554|       -6.0|
|EZE00100082|      -58.0|
|ITE00100554|       13.0|
|EZE00100082|      -57.0|
|ITE00100554|       10.0|
|EZE00100082|      -50.0|
|ITE00100554|       14.0|
|EZE00100082|      -31.0|
|ITE00100554|       23.0|
|EZE00100082|      -46.0|
|ITE00100554|       31.0|
|EZE00100082|      -75.0|
+-----------+-----------+
only showing top 20 rows



In [17]:
# Aggregate to find minimum temperature for every station
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

+-----------+----------------+
|  stationID|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [33]:
# Convert temperature to fahrenheit and sort the dataset
minTempsByStationF = minTempsByStation.withColumn("temperature",
                        func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2)) \
                        .select("stationID", "temperature")

In [34]:
minTempsByStationF.show()

+-----------+-----------+
|  stationID|temperature|
+-----------+-----------+
|ITE00100554|       5.36|
|EZE00100082|        7.7|
+-----------+-----------+



In [26]:
# Collect, format, and print the results
results = minTempsByStationF.collect()

In [27]:
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

ITE00100554	5.36F
EZE00100082	7.70F
