In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
spark = SparkSession.builder.appName("minTemperaturedDF").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/18 22:11:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema = StructType([
    StructField("stationId", StringType(), True), \
    StructField("date", IntegerType(), True), \
    StructField("measure_type", StringType(), True), \
    StructField("temperature", FloatType(), True)
])

In [4]:
inputDf = spark.read.schema(schema).csv("1800.csv")

In [5]:
inputDf.printSchema()

root
 |-- stationId: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- measure_type: string (nullable = true)
 |-- temperature: float (nullable = true)



In [6]:
minTemps = inputDf.filter(inputDf.measure_type == "TMIN")
minTemps.show()

+-----------+--------+------------+-----------+
|  stationId|    date|measure_type|temperature|
+-----------+--------+------------+-----------+
|ITE00100554|18000101|        TMIN|     -148.0|
|EZE00100082|18000101|        TMIN|     -135.0|
|ITE00100554|18000102|        TMIN|     -125.0|
|EZE00100082|18000102|        TMIN|     -130.0|
|ITE00100554|18000103|        TMIN|      -46.0|
|EZE00100082|18000103|        TMIN|      -73.0|
|ITE00100554|18000104|        TMIN|      -13.0|
|EZE00100082|18000104|        TMIN|      -74.0|
|ITE00100554|18000105|        TMIN|       -6.0|
|EZE00100082|18000105|        TMIN|      -58.0|
|ITE00100554|18000106|        TMIN|       13.0|
|EZE00100082|18000106|        TMIN|      -57.0|
|ITE00100554|18000107|        TMIN|       10.0|
|EZE00100082|18000107|        TMIN|      -50.0|
|ITE00100554|18000108|        TMIN|       14.0|
|EZE00100082|18000108|        TMIN|      -31.0|
|ITE00100554|18000109|        TMIN|       23.0|
|EZE00100082|18000109|        TMIN|     

In [7]:
stationTemps = minTemps.select("stationId", "temperature")
stationTemps.show()

+-----------+-----------+
|  stationId|temperature|
+-----------+-----------+
|ITE00100554|     -148.0|
|EZE00100082|     -135.0|
|ITE00100554|     -125.0|
|EZE00100082|     -130.0|
|ITE00100554|      -46.0|
|EZE00100082|      -73.0|
|ITE00100554|      -13.0|
|EZE00100082|      -74.0|
|ITE00100554|       -6.0|
|EZE00100082|      -58.0|
|ITE00100554|       13.0|
|EZE00100082|      -57.0|
|ITE00100554|       10.0|
|EZE00100082|      -50.0|
|ITE00100554|       14.0|
|EZE00100082|      -31.0|
|ITE00100554|       23.0|
|EZE00100082|      -46.0|
|ITE00100554|       31.0|
|EZE00100082|      -75.0|
+-----------+-----------+
only showing top 20 rows



In [9]:
minTempByStation = stationTemps.groupBy("stationId").min("temperature")
minTempByStation.show()

+-----------+----------------+
|  stationId|min(temperature)|
+-----------+----------------+
|ITE00100554|          -148.0|
|EZE00100082|          -135.0|
+-----------+----------------+



In [12]:
minTempByStationF = minTempByStation.withColumn("temperature", func.round(func.col("min(temperature)") * 0.1, 2)). \
                    select("stationId", "temperature").sort("temperature")

In [13]:
minTempByStationF.show()

+-----------+-----------+
|  stationId|temperature|
+-----------+-----------+
|ITE00100554|      -14.8|
|EZE00100082|      -13.5|
+-----------+-----------+



In [15]:
results = minTempByStationF.collect()

for result in results:
    print(result[0] +"\t{:.2f}C".format(result[1]))

ITE00100554	-14.80C
EZE00100082	-13.50C


In [16]:
spark.stop()