In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.functions import round 

In [2]:
spark1 = SparkSession.builder \
    .master('local') \
    .appName('MyTest') \
    .getOrCreate()

In [3]:
# sc = spark1._sc

In [4]:
schema1 = StructType([ \
        StructField("stationId", StringType(), True),\
        StructField("time", StringType(), True),\
        StructField("entryType", StringType(), True),\
        StructField("temp", IntegerType(), True),\
    ])

In [5]:
df = spark1.read\
    .schema(schema1)\
    .csv('1800temps.csv')

In [6]:
df.printSchema()

root
 |-- stationId: string (nullable = true)
 |-- time: string (nullable = true)
 |-- entryType: string (nullable = true)
 |-- temp: integer (nullable = true)



In [7]:
df = df.withColumn("tempF", round(df.temp * 0.1 * (0.9/0.5) + 32.0,2))
df.show()

+-----------+--------+---------+----+-----+
|  stationId|    time|entryType|temp|tempF|
+-----------+--------+---------+----+-----+
|ITE00100554|18000101|     TMAX| -75| 18.5|
|ITE00100554|18000101|     TMIN|-148| 5.36|
|GM000010962|18000101|     PRCP|   0| 32.0|
|EZE00100082|18000101|     TMAX| -86|16.52|
|EZE00100082|18000101|     TMIN|-135|  7.7|
|ITE00100554|18000102|     TMAX| -60| 21.2|
|ITE00100554|18000102|     TMIN|-125|  9.5|
|GM000010962|18000102|     PRCP|   0| 32.0|
|EZE00100082|18000102|     TMAX| -44|24.08|
|EZE00100082|18000102|     TMIN|-130|  8.6|
|ITE00100554|18000103|     TMAX| -23|27.86|
|ITE00100554|18000103|     TMIN| -46|23.72|
|GM000010962|18000103|     PRCP|   4|32.72|
|EZE00100082|18000103|     TMAX| -10| 30.2|
|EZE00100082|18000103|     TMIN| -73|18.86|
|ITE00100554|18000104|     TMAX|   0| 32.0|
|ITE00100554|18000104|     TMIN| -13|29.66|
|GM000010962|18000104|     PRCP|   0| 32.0|
|EZE00100082|18000104|     TMAX| -55| 22.1|
|EZE00100082|18000104|     TMIN|

In [8]:
df.registerTempTable("temps")
df.describe().show()
df.show()

+-------+-----------+-------------------+---------+-----------------+------------------+
|summary|  stationId|               time|entryType|             temp|             tempF|
+-------+-----------+-------------------+---------+-----------------+------------------+
|  count|       1825|               1825|     1825|             1825|              1825|
|   mean|       null|1.800066832328767E7|     null|97.53479452054795| 49.55626301369864|
| stddev|       null|  345.0964453798429|     null|93.40225576440513|16.812406037592925|
|    min|EZE00100082|           18000101|     PRCP|             -148|              5.36|
|    max|ITE00100554|           18001231|     TMIN|              323|             90.14|
+-------+-----------+-------------------+---------+-----------------+------------------+

+-----------+--------+---------+----+-----+
|  stationId|    time|entryType|temp|tempF|
+-----------+--------+---------+----+-----+
|ITE00100554|18000101|     TMAX| -75| 18.5|
|ITE00100554|18000101|

In [9]:
schemaStation = StructType([ \
        StructField("stationId", StringType(), True),\
        StructField("city", StringType(), True),\
    ])

In [10]:
dfStation = spark1.read\
    .schema(schemaStation)\
    .csv('1800stations.csv')

In [11]:
dfStation.registerTempTable("stations")
dfStation.show()

+-----------+----------------+
|  stationId|            city|
+-----------+----------------+
|ITE00100554|           Milan|
|EZE00100082|          Prague|
|GM000010962|Hohenpeissenberg|
+-----------+----------------+



In [12]:
        # max(t.temp * 0.1 * (0.9/0.5) + 32.0) as maxTemp 
    
dfJoin = spark1.sql("\
    select s.city, t.stationId, \
        max(tempF) as maxTemp \
    from temps t join stations s on s.stationId = t.stationId \
    where entryType = 'TMAX' \
    group by s.city, t.stationId")
dfJoin.show()

+------+-----------+-------+
|  city|  stationId|maxTemp|
+------+-----------+-------+
|Prague|EZE00100082|  90.14|
| Milan|ITE00100554|  90.14|
+------+-----------+-------+

