# Libraries

In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import sys
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
from pyspark.sql import SparkSession, DataFrame
import requests
import calendar
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType
from pyspark.sql.functions import col, unix_timestamp
from functools import reduce
from pyspark.sql.functions import count, when, isnull, countDistinct
from pyspark.sql import functions as F
from pyspark.sql.window import Window





#### Calling the Api and create Dataframe with pyspark

In [3]:
# create spark session 
spark = SparkSession.builder.appName('data_exploration').getOrCreate()

In [4]:
# Define the mapping of month names
months= {
    1: 'January',
    2: 'February',
    3: 'March',
    4: 'April',
    5: 'May',
    6: 'June',
    7: 'July',
    8: 'August',
    9: 'September',
    10: 'October',
    11: 'November',
    12: 'December'
}


# Define a list for the DataFrames of the monthly prices
dfs_monthly_prices = []

# Define the parameters for the API request
params = {
    'time_trunc': 'hour',
    'geo_limit': 'peninsular',
    'geo_ids': '8741'
}

# Define the base URL for the API request
base_url = 'https://apidatos.ree.es/es/datos/mercados/precios-mercados-tiempo-real?'

# Iterate through the months of the year 2022
for month in range(1, 13):  # 1 to 12 for January to December
    # Get the last day of the current month
    last_day = calendar.monthrange(2022, month)[1]

    # Define the start and end dates for each month
    start_date = f'2022-{month:02d}-01T00:00'
    end_date = f'2022-{month:02d}-{last_day:02d}T23:59'

    # Add the dates to the parameters
    params['start_date'] = start_date
    params['end_date'] = end_date

    # Make the HTTP request
    response = requests.get(base_url, params=params)
    data = response.json()

    # Extract the price data for the current month
    df_monthly_prices = data['included'][0]['attributes']['values']

    # Define the schema for the DataFrame
    schema = StructType([
        StructField("value", StringType(), True),
        StructField("percentage", StringType(), True),
        StructField("datetime", StringType(), True)
    ])

    # Create the DataFrame for the current month
    df = spark.createDataFrame(df_monthly_prices, schema=schema)

    # Convert the datatypes of the columns
    df = df.withColumn("value", col("value").cast(DoubleType()))
    df = df.withColumn("percentage", col("percentage").cast(IntegerType()))
    df = df.withColumn("datetime", unix_timestamp(col("datetime"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX").cast(TimestampType()))

    # Add the DataFrame of the current month to the list
    dfs_monthly_prices.append(df)

    # Name the DataFrame with the month name in English
    month_name = months[month]
    globals()[f'df_prices_{month_name}'] = df


# Concatena todos los DataFrames en uno solo
df_yearly_prices = reduce(DataFrame.union, dfs_monthly_prices)

# make global the df_yearly_prices
globals()['df_yearly_prices'] = df_yearly_prices






In [5]:
df_yearly_prices.show(5)


+------+----------+-------------------+
| value|percentage|           datetime|
+------+----------+-------------------+
|204.51|         1|2022-01-01 00:00:00|
|171.35|         1|2022-01-01 01:00:00|
| 172.7|         1|2022-01-01 02:00:00|
|156.07|         1|2022-01-01 03:00:00|
|159.08|         1|2022-01-01 04:00:00|
+------+----------+-------------------+
only showing top 5 rows



#### Exploratory Analysis


In [6]:
df_yearly_prices.show(10)

+------+----------+-------------------+
| value|percentage|           datetime|
+------+----------+-------------------+
|204.51|         1|2022-01-01 00:00:00|
|171.35|         1|2022-01-01 01:00:00|
| 172.7|         1|2022-01-01 02:00:00|
|156.07|         1|2022-01-01 03:00:00|
|159.08|         1|2022-01-01 04:00:00|
|157.28|         1|2022-01-01 05:00:00|
|158.87|         1|2022-01-01 06:00:00|
|157.58|         1|2022-01-01 07:00:00|
|156.18|         1|2022-01-01 08:00:00|
|118.96|         1|2022-01-01 09:00:00|
+------+----------+-------------------+
only showing top 10 rows



In [7]:
df_yearly_prices.printSchema()

root
 |-- value: double (nullable = true)
 |-- percentage: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [8]:
# count the number of null values in each column
null_values = df_yearly_prices.select([count(when(isnull(c), c)).alias(c) for c in df.columns])

null_values.show()


+-----+----------+--------+
|value|percentage|datetime|
+-----+----------+--------+
|    0|         0|       0|
+-----+----------+--------+



In [9]:
# Shape of the DataFrame
print(f'Number of rows: {df_yearly_prices.count()}')
print(f'Number of columns: {len(df_yearly_prices.columns)}')

Number of rows: 8760
Number of columns: 3


In [10]:
# Number of distinct values in each column
df.select([countDistinct(col(c)).alias(c) for c in df.columns]).show()


+-----+----------+--------+
|value|percentage|datetime|
+-----+----------+--------+
|  739|         1|     744|
+-----+----------+--------+



#### Feature engineering

In [11]:
# group by value and count the number of occurrences
top_values = df_yearly_prices.groupBy("value").agg(F.count("value").alias("count"))

# Order the values by the count in descending order
top_values = top_values.orderBy(F.desc("count"))

# Limit the number of values to 10
top_10_values = top_values.limit(10)
top_10_values.show()


+------+-----+
| value|count|
+------+-----+
|319.66|    4|
|200.68|    4|
| 239.8|    4|
|276.59|    4|
| 272.6|    4|
|277.14|    4|
| 171.3|    4|
|263.16|    4|
|256.45|    4|
| 262.3|    4|
+------+-----+



In [12]:
# average price per month and percentage growth compared to the previous month
monthly_prices = df_yearly_prices.groupBy(F.month("datetime").alias("month")).agg(F.avg("value").alias("avg_price"))


window_spec = Window.orderBy("month")
monthly_prices = monthly_prices.withColumn("% growth", (F.col("avg_price") - F.lag("avg_price").over(window_spec)) / F.lag("avg_price").over(window_spec) * 100)

monthly_prices.show()

+-----+------------------+-------------------+
|month|         avg_price|           % growth|
+-----+------------------+-------------------+
|    1|285.86005376344076|               null|
|    2|286.30842261904763| 0.1568490769185658|
|    3|387.74535666218026|  35.42925252258513|
|    4|268.63352777777783|-30.719085822135987|
|    5|  259.933561827957| -3.238600193278069|
|    6| 290.9040694444444| 11.914778298997009|
|    7| 331.3574327956989| 13.906083688863657|
|    8| 395.2318817204301| 19.276600613970086|
|    9| 316.2872361111111|   -19.974260493783|
|   10|226.72640268456377|-28.316297087336395|
|   11|189.43786111111112| -16.44649283538929|
|   12|206.44568548387096|  8.978049199354205|
+-----+------------------+-------------------+



In [13]:
# for every week, maximum, minimum and average price per day
weekly_prices = df_yearly_prices.groupBy(F.weekofyear("datetime").alias("week"), F.dayofweek("datetime").alias("day")).agg(F.max("value").alias("max_price"), F.min("value").alias("min_price"), F.avg("value").alias("avg_price"))
weekly_daily_prices = weekly_prices.orderBy("week", "day")
weekly_daily_prices.show()



+----+---+---------+---------+------------------+
|week|day|max_price|min_price|         avg_price|
+----+---+---------+---------+------------------+
|   1|  1|   306.97|    36.35|168.47250000000003|
|   1|  2|   326.48|   164.71|236.62625000000003|
|   1|  3|   332.83|   152.68| 239.9920833333333|
|   1|  4|   408.58|   114.73|          272.3175|
|   1|  5|   352.09|   235.25|276.80791666666664|
|   1|  6|   422.83|    225.3|317.19958333333335|
|   1|  7|   347.83|   199.53|267.54041666666666|
|   2|  1|   337.73|   242.96|         275.07625|
|   2|  2|   464.32|   220.69| 319.3666666666666|
|   2|  3|   426.54|   252.26| 322.9820833333333|
|   2|  4|   421.94|   232.98|           301.325|
|   2|  5|   425.48|   240.19| 313.0195833333333|
|   2|  6|   392.52|   250.46|298.06374999999997|
|   2|  7|   339.66|   251.89|         281.13875|
|   3|  1|   317.45|   234.32|260.56458333333336|
|   3|  2|   452.94|   277.28|350.23541666666665|
|   3|  3|   437.62|   254.54|322.42708333333337|


In [14]:
# convert to csv df_yearly_prices in folder Data pandas
df_yearly_prices.toPandas().to_csv('../Data/df_yearly_prices.csv', index=False)


  series = series.astype(t, copy=False)


In [None]:
# Suponiendo que 'df_yearly_prices' contiene el DataFrame que deseas guardar
# Define la ruta donde deseas guardar el archivo CSV
ruta_csv = '../Data/df_yearly_prices_pyspark.csv'

# Escribe el DataFrame en formato CSV
df_yearly_prices.write.csv('../Data/df_yearly_prices_pyspark.csv', header=True)
