In [29]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as funct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [30]:
sparkSession =  SparkSession.builder.appName("CustomSchema").getOrCreate()

In [31]:
# creating custome Schema as per requirement and giving them datatype and field Name
schemaC = StructType([StructField("Station_ID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("MeasureType", StringType(), True), \
                     StructField("Temperature", FloatType(), True)])

In [32]:
# reading CSV file as Above defined Custom Schema
df = sparkSession.read.csv("1800.csv", schema=schemaC)
df.printSchema()


root
 |-- Station_ID: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- MeasureType: string (nullable = true)
 |-- Temperature: float (nullable = true)



In [33]:
# Filtering minimum temperature
minTempt = df.filter(df.MeasureType == 'TMIN')

In [34]:
# Selecting only two fields as per requirement
stationTemps = minTempt.select("Station_ID","Temperature")

In [35]:
# finding minimum temperature for every station
minTempStationWise = stationTemps.groupBy("Station_ID").agg(funct.min("Temperature").alias("MinT"))
minTempStationWise.show()

+-----------+------+
| Station_ID|  MinT|
+-----------+------+
|ITE00100554|-148.0|
|EZE00100082|-135.0|
+-----------+------+



In [36]:
minTempStationWiseF = minTempStationWise.withColumn("F_Temp", funct.round(funct.round(funct.col("MinT"))*0.1*(9/5)+32 , 2))\
        .select("Station_ID","MinT","F_Temp").sort("F_Temp")

In [37]:
resultData = minTempStationWiseF.collect()

In [38]:
print(resultData)

[Row(Station_ID='ITE00100554', MinT=-148.0, F_Temp=5.36), Row(Station_ID='EZE00100082', MinT=-135.0, F_Temp=7.7)]


In [39]:
sparkSession.stop()