In [1]:
from pyspark.sql import SparkSession

# Before start you should be to install JAVA
spark = (SparkSession
.builder
.appName("Weather-Atlas")
.getOrCreate()) # Entry-point to PySpark App

# If click A- [above] or B - [below] in VSC hepls to add code

In [2]:
# Read of DataFrame with auto identification of schema
df = spark.read.format("csv").option('header', "true").load("../dataset/weatherHistory.csv")

df.printSchema() # Help to explore schema of DataFrame

root
 |-- Formatted Date: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): string (nullable = true)
 |-- Apparent Temperature (C): string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- Wind Speed (km/h): string (nullable = true)
 |-- Wind Bearing (degrees): string (nullable = true)
 |-- Visibility (km): string (nullable = true)
 |-- Loud Cover: string (nullable = true)
 |-- Pressure (millibars): string (nullable = true)
 |-- Daily Summary: string (nullable = true)



In [3]:
# Exploration of WEATHER DATA
df.show(7)

+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|      Formatted Date|      Summary|Precip Type|  Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|2006-04-01 00:00:...|Partly Cloudy|       rain|9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|Partly cloudy thr...|
|2006-04-01 01:00:...|Partly Cloudy|       rain|9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.

In [7]:
df.show(2, vertical=True)

-RECORD 0----------------------------------------
 Formatted Date           | 2006-04-01 00:00:... 
 Summary                  | Partly Cloudy        
 Precip Type              | rain                 
 Temperature (C)          | 9.472222222222221    
 Apparent Temperature (C) | 7.3888888888888875   
 Humidity                 | 0.89                 
 Wind Speed (km/h)        | 14.1197              
 Wind Bearing (degrees)   | 251.0                
 Visibility (km)          | 15.826300000000002   
 Loud Cover               | 0.0                  
 Pressure (millibars)     | 1015.13              
 Daily Summary            | Partly cloudy thr... 
-RECORD 1----------------------------------------
 Formatted Date           | 2006-04-01 01:00:... 
 Summary                  | Partly Cloudy        
 Precip Type              | rain                 
 Temperature (C)          | 9.355555555555558    
 Apparent Temperature (C) | 7.227777777777776    
 Humidity                 | 0.86                 


Count of State of Atmosphere

In [11]:
df.groupBy("Summary").count().show()

+--------------------+-----+
|             Summary|count|
+--------------------+-----+
|              Breezy|   54|
|Humid and Mostly ...|   40|
|  Windy and Overcast|   45|
|               Foggy| 7148|
|Humid and Partly ...|   17|
|     Windy and Foggy|    4|
|Breezy and Partly...|  386|
|                 Dry|   34|
|       Partly Cloudy|31733|
|               Clear|10890|
|       Mostly Cloudy|28094|
|    Breezy and Foggy|   35|
| Breezy and Overcast|  528|
|Dangerously Windy...|    1|
|Breezy and Mostly...|  516|
|Windy and Partly ...|   67|
|               Windy|    8|
|Dry and Partly Cl...|   86|
|Windy and Mostly ...|   35|
|            Overcast|16597|
+--------------------+-----+
only showing top 20 rows



In [6]:
import pyspark.sql.functions as F
df.select("Summary", F.round("Temperature (C)", 3).alias("Temp")).show(5)

+-------------+-----+
|      Summary| Temp|
+-------------+-----+
|Partly Cloudy|9.472|
|Partly Cloudy|9.356|
|Mostly Cloudy|9.378|
|Partly Cloudy|8.289|
|Mostly Cloudy|8.756|
+-------------+-----+
only showing top 5 rows



In [23]:
# Show the fist 15-entries where visibility above 15

df.select(F.col("Formatted Date")).where(F.col("Visibility (km)") > 15).show(15)

+--------------------+
|      Formatted Date|
+--------------------+
|2006-08-17 15:00:...|
|2006-08-23 19:00:...|
|2006-12-27 14:00:...|
|2006-12-27 15:00:...|
|2006-07-15 19:00:...|
|2006-07-16 16:00:...|
|2006-07-17 10:00:...|
|2006-07-19 01:00:...|
|2006-07-04 12:00:...|
|2006-05-13 23:00:...|
|2006-10-12 13:00:...|
|2007-04-09 16:00:...|
|2007-07-14 21:00:...|
|2007-06-22 12:00:...|
|2007-06-23 09:00:...|
+--------------------+
only showing top 15 rows



In [21]:
(df
.select("Precip Type", "Temperature (C)")
.filter(F.col("Precip Type") == "snow")
.filter(F.col("Temperature (C)") <= -10)
.show(5)
)

+-----------+-------------------+
|Precip Type|    Temperature (C)|
+-----------+-------------------+
|       snow|-10.305555555555555|
|       snow|-11.083333333333334|
|       snow|-10.805555555555555|
|       snow|-11.822222222222223|
|       snow|-11.855555555555556|
+-----------+-------------------+
only showing top 5 rows



In [42]:
(
    df
    .groupBy("Summary")
    .count()
    .orderBy(F.col("count").desc())
    .show(8)
)

+--------------------+-----+
|             Summary|count|
+--------------------+-----+
|       Partly Cloudy|31733|
|       Mostly Cloudy|28094|
|            Overcast|16597|
|               Clear|10890|
|               Foggy| 7148|
| Breezy and Overcast|  528|
|Breezy and Mostly...|  516|
|Breezy and Partly...|  386|
+--------------------+-----+
only showing top 8 rows



In [9]:
spark.stop()

In [4]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.types as t
import pyspark.sql.functions as f


def extract_dataset(spark: SparkSession) -> DataFrame:
    url = "../dataset/weatherHistory.csv"
    return spark.read.option("header", 'true').csv(url)


def transform_dataset(df: DataFrame) -> DataFrame:
    outcome = (
        df
        .groupBy("Summary")
        .agg(
            f.count("Summary").alias("count"),
            f.min("Apparent Temperature (C)").alias("min_temp"),
            f.max("Apparent Temperature (C)").alias("max_temp")
        )
        .orderBy(f.col("count").desc())
    ).show()

    return outcome


def save_dataset(df: DataFrame) -> None:
    df.coalesce(4).write.mode("overwrite").format("json").save("outcome.json")


def main():
    spark = SparkSession.builder.appName("Weather-Atlas2").getOrCreate()
    df = extract_dataset(spark)
    outcome = transform_dataset(df)
    save_dataset(outcome)
    #spark.stop()

main()

+--------------------+-----+--------------------+------------------+
|             Summary|count|            min_temp|          max_temp|
+--------------------+-----+--------------------+------------------+
|       Partly Cloudy|31733|-0.00555555555555...| 9.994444444444447|
|       Mostly Cloudy|28094|-0.00555555555555...| 9.994444444444447|
|            Overcast|16597|-0.00555555555555...| 9.994444444444447|
|               Clear|10890|-0.00555555555555...| 9.994444444444447|
|               Foggy| 7148|-0.00555555555555...| 9.994444444444447|
| Breezy and Overcast|  528|-0.03888888888888905| 6.661111111111112|
|Breezy and Mostly...|  516|-0.13333333333333247| 6.527777777777778|
|Breezy and Partly...|  386|-0.00555555555555...| 6.611111111111111|
|Dry and Partly Cl...|   86|                20.0| 35.67222222222222|
|Windy and Partly ...|   67| -0.7111111111111117| 5.977777777777777|
|          Light Rain|   63|  10.166666666666664|  8.11111111111111|
|              Breezy|   54|-0.016

AttributeError: 'NoneType' object has no attribute 'coalesce'