MapReduce can help with quickly pulling information about our two datasets. 

In [None]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 38.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=099ca452aabe4112d20d1b14e681b78da5b0e9b8f7e8e3463ee723a71cd8fe5a
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


Import pyspark and create the Spark session:

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import *

#create the session
conf = SparkConf()

#create the context
sc = pyspark.SparkContext('local[*]')
spark = SparkSession.builder.getOrCreate()

Create the dataframes from our csv files:

In [None]:
#inferSchema allows different datatypes
energy_df = spark.read.csv('energy_dataset.csv', inferSchema=True, header=True)
energy_df.show(10)

weather_df = spark.read.csv('weather_features.csv', inferSchema=True, header=True)
weather_df.show(10)

+--------------------+------------------+------------------------------------+----------------------------------+---------------------+---------------------------+---------------------+---------------------------+----------------------+---------------------+------------------------------------------+-------------------------------------------+------------------------------------------+--------------------------------+-----------------+------------------+----------------+--------------------------+----------------+----------------+------------------------+-----------------------+------------------------+---------------------------------+-------------------------------+-------------------+-----------------+---------------+------------+
|                time|generation biomass|generation fossil brown coal/lignite|generation fossil coal-derived gas|generation fossil gas|generation fossil hard coal|generation fossil oil|generation fossil oil shale|generation fossil peat|generation geotherma

Some pyspark queries using map-reduce:

In [None]:
print("Average temperature by city:\n")
weather_df.rdd.map(lambda x: (x[1], x[3])) \
.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.take(5)

Average temperature by city:



[('Bilbao', 284.91666115356213),
 (' Barcelona', 288.59470412145697),
 ('Valencia', 290.2222765464678),
 ('Madrid', 286.82487717417484),
 ('Seville', 291.1841030547889)]

In [None]:
weather_df.filter(weather_df['snow_3h'] > 0).groupBy(weather_df['city_name']).count().take(5)

[Row(city_name='Madrid', count=1),
 Row(city_name='Bilbao', count=257),
 Row(city_name='Valencia', count=9)]

In [None]:
weather_df.filter(weather_df['weather_main'] == 'clear').groupBy(weather_df['city_name']).count().take(5)

[Row(city_name='Madrid', count=20356),
 Row(city_name='Seville', count=23581),
 Row(city_name=' Barcelona', count=14760),
 Row(city_name='Bilbao', count=8453),
 Row(city_name='Valencia', count=15535)]

In [None]:
print("Average pressure by city:\n")
weather_df.rdd.map(lambda x: (x[1], x[5])) \
.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1],y[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.take(5)

Average pressure by city:



[('Bilbao', 36582567.0),
 (' Barcelona', 45551556.0),
 ('Valencia', 35706399.0),
 ('Madrid', 36696345.0),
 ('Seville', 36214972.0)]

In [None]:
print("Average weather id by city: \n")
weather_df.rdd.map(lambda x: (x[1], x[13])) \
.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y:(x[0]+y[0], x[1]+y[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.take(5)

Average weather id by city: 



[('Bilbao', 723.9432282829407),
 (' Barcelona', 760.917465328673),
 ('Valencia', 781.2282828282828),
 ('Madrid', 762.2602641519838),
 ('Seville', 771.4098489748854)]

In [None]:
print("Average snow_3h per city:\n")
weather_df.rdd.map(lambda x: (x[1], x[11])) \
.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
.mapValues(lambda x: x[0]/x[1]) \
.take(5)

Average snow_3h per city:



[('Bilbao', 0.02345514728380295),
 (' Barcelona', 0.0),
 ('Valencia', 0.00015364916773367477),
 ('Madrid', 2.8924366503984343e-05),
 ('Seville', 0.0)]

In [None]:
from pyspark.sql.functions import *
energy_df.agg(min(col("price actual")), max(col("price actual")), min(col("price day ahead")), max(col("price day ahead"))).show()

+-----------------+-----------------+--------------------+--------------------+
|min(price actual)|max(price actual)|min(price day ahead)|max(price day ahead)|
+-----------------+-----------------+--------------------+--------------------+
|             9.33|            116.8|                2.06|              101.99|
+-----------------+-----------------+--------------------+--------------------+



In [None]:
energy_df.agg(min(col("generation solar")), max(col("generation solar")), min(col("generation wind onshore")), \
              max(col("generation wind onshore")), min(col("generation nuclear")), max(col("generation nuclear")), \
              min(col("generation other renewable")), max(col("generation other renewable"))).show()

+---------------------+---------------------+----------------------------+----------------------------+-----------------------+-----------------------+-------------------------------+-------------------------------+
|min(generation solar)|max(generation solar)|min(generation wind onshore)|max(generation wind onshore)|min(generation nuclear)|max(generation nuclear)|min(generation other renewable)|max(generation other renewable)|
+---------------------+---------------------+----------------------------+----------------------------+-----------------------+-----------------------+-------------------------------+-------------------------------+
|                  0.0|               5792.0|                         0.0|                     17436.0|                    0.0|                 7117.0|                            0.0|                          119.0|
+---------------------+---------------------+----------------------------+----------------------------+-----------------------+---------