In [10]:
#utils.py
import logging

logger = logging.getLogger('exemplo')
logger.setLevel(logging.INFO)
logger.info('ola que taaal')


def count_duplicates(df, target_columns):
  df_dup = df.groupBy(target_columns).count().filter(col("count") > 1)
  print("No duplicados", df.count() - df_dup.count())


INFO:exemplo:ola que taaal


In [8]:
#report_data_quality.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, sum


spark = SparkSession.builder.appName("SparkDQ").getOrCreate()

# FALTA VALIDAR ESQUEMA!!!!!
def get_data(input_path):
    input_path_energy= input_path
    print(f"Leyendo weather data desde: {input_path}")
    df = spark.read.csv(input_path, header=True, inferSchema=True)
    print("Total registros",     df.count())

    print("Mostrando esquema de:", input_path)
    df.printSchema()

    return df


#RAUL: os campos deben estar informados para todos os campos menos para weather_id e weather_icon
def count_nulls(df):
  null_count=df.select([sum(col(df_col).isNull().cast('int')).alias(df_col) for df_col in df.columns])
  null_count.show()


def weather_quality_rules(df):
  #RAUL: falta chequear as unidades de temperatura e vento estean en unidades correctas
  df_temp=df.filter((col("temp")>=238.15) & (col("temp")<= 321.15))
  df_temp_min=df.filter((col("temp_min")>=238.15) & (col("temp_min")<= 321.15))
  df_temp_max=df.filter((col("temp_max")>=238.15) & (col("temp_max")<= 321.15))
  df_pressure=df.filter((col("pressure")>=800) & (col("pressure")<= 1100))
  df_humidity=df.filter((col("humidity")>=0) & (col("humidity")<=100))
  df_speed=df.filter((col("wind_speed")>=0) & (col("wind_speed")<=68.88))
  df_deg=df.filter((col("wind_deg")>=0) & (col("wind_deg")<=360))
  df_rain1=df.filter((col("rain_1h")>=0))
  df_rain3=df.filter((col("rain_3h")>=0))
  df_snow3=df.filter((col("snow_3h")>=0))
  df_clouds=df.filter((col("clouds_all")>=0) & (col("clouds_all")<=100))

  print("temp: ", df_temp.count(), "temp_min:",  df_temp_min.count(), "temp_max:",
        df_temp_max.count(), "pressure:", df_pressure.count(), "humidity:",
        df_humidity.count(), "speed:", df_speed.count(), "deg:", df_deg.count(),
        "rain1:", df_rain1.count(), "rain3:", df_rain3.count(), "snow3:", df_snow3.count(),
        "clouds:", df_clouds.count())


# WEATHER
print("Weather checks")
weather_df=get_data("weather_features.csv")
count_duplicates(weather_df, ["dt_iso", "city_name"])
count_nulls(weather_df)
weather_quality_rules(weather_df)

#Energy
print("\nEnergy checks")
energy_df=get_data("energy_dataset.csv")
count_duplicates(energy_df, ["time"])
count_nulls(energy_df)


spark.stop()


Weather checks
Leyendo weather data desde: weather_features.csv
Total registros 178396
Mostrando esquema de: weather_features.csv
root
 |-- dt_iso: timestamp (nullable = true)
 |-- city_name: string (nullable = true)
 |-- temp: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- pressure: integer (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- wind_speed: integer (nullable = true)
 |-- wind_deg: integer (nullable = true)
 |-- rain_1h: double (nullable = true)
 |-- rain_3h: double (nullable = true)
 |-- snow_3h: double (nullable = true)
 |-- clouds_all: integer (nullable = true)
 |-- weather_id: integer (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- weather_icon: string (nullable = true)

No duplicados 175598
+------+---------+----+--------+--------+--------+--------+----------+--------+-------+-------+-------+----------+----------+------------

In [22]:
#etl.py

from functools import reduce

spark = SparkSession.builder.appName("SparkETL").getOrCreate()

#--------------WEATHER------------
input_path_weather="weather_features.csv"
weather_df=spark.read.csv("weather_features.csv", header=True, inferSchema=True)

#Borrado duplicados
weather_df=weather_df.dropDuplicates(["dt_iso", "city_name"])

#Temperaturas cambiadas a ºC
for temp_col in ["temp", "temp_min", "temp_max"]:
  weather_df = weather_df.withColumn(temp_col, col(temp_col) - 273.15)



#Convirto o vento a km/h (estaba en m/s)
weather_df = weather_df.withColumn('wind_speed', col('wind_speed') * 3.6 )


#Renombrada columna dt_iso a time
weather_df = weather_df.withColumnRenamed("dt_iso", "time")


#Eliminados valores de presión e vento imposibles que ocurriran
weather_df=weather_df.filter((col("pressure")>=800) & (col("pressure")<= 1100))
weather_df=weather_df.filter((col("wind_speed")>=0) & (col("wind_speed")<= 248))


#Borro columnas que non queremos para a análise
weather_df=weather_df.drop('weather_id')
weather_df=weather_df.drop('weather_icon')


#--------------ENERGY------------
input_path_energy="energy_dataset.csv"
energy_df=spark.read.csv("energy_dataset.csv", header=True, inferSchema=True)
#Borro duplicados
energy_df=energy_df.dropDuplicates(['time'])

#Hai 18 columnas con nulls que son as mismas filas entón borreinas directamente a partir de "generation fossil brown coal/lignite"
columnas_a_verificar = ["generation biomass", "generation fossil brown coal/lignite", "generation fossil coal-derived gas", "generation fossil gas", "generation fossil hard coal", "generation fossil oil", "generation fossil oil shale", "generation fossil peat", "generation geothermal", "generation hydro pumped storage consumption"]
condicion_nulos = reduce(lambda acc, c: acc & col(c).isNull(), columnas_a_verificar, col(columnas_a_verificar[0]).isNull())
df_nulos = energy_df.filter(condicion_nulos)
print(df_nulos.count())

# Borrado de las 18 columnas que tienen valores nulos iguales en mil columnas
energy_df=energy_df.dropna(subset=["generation fossil brown coal/lignite"])

median_biomass = energy_df.approxQuantile('generation biomass', [0.5], 0.01)[0]
median_oil = energy_df.approxQuantile('generation fossil oil', [0.5], 0.01)[0]
median_hydro = energy_df.approxQuantile('generation hydro pumped storage consumption', [0.5], 0.01)[0]
median_river = energy_df.approxQuantile('generation hydro run-of-river and poundage', [0.5], 0.01)[0]
median_marine = energy_df.approxQuantile('generation marine', [0.5], 0.01)[0]
median_waste = energy_df.approxQuantile('generation waste', [0.5], 0.01)[0]
median_total_load = energy_df.approxQuantile('total load actual', [0.5], 0.01)[0]


# Imputei por mediana polo rollo de que é menos sensible a outliers
# Imputei por 'unknown' as que tiñan todo null pero a verdade creo que case a de forecast wind blabla podiamos borrar a col igual digo eu??? pq non aporta
# e logo a de generation hydro bla bla igual si que deixala a 'unknown'??
energy_df = energy_df.fillna({'generation biomass': median_biomass,
                              'generation fossil oil': median_oil,
                              'generation hydro pumped storage aggregated': 'Unknown',
                              'generation hydro pumped storage consumption': median_hydro,
                              'generation hydro run-of-river and poundage': median_river,
                              'generation marine': median_marine,
                              'generation waste': median_waste,
                              'forecast wind offshore eday ahead':'Unknown',
                              'total load actual': median_total_load})

count_nulls(energy_df)
weather_df.describe().show()
energy_df.describe().show()

spark.stop()

18
+----+------------------+------------------------------------+----------------------------------+---------------------+---------------------------+---------------------+---------------------------+----------------------+---------------------+------------------------------------------+-------------------------------------------+------------------------------------------+--------------------------------+-----------------+------------------+----------------+--------------------------+----------------+----------------+------------------------+-----------------------+------------------------+---------------------------------+-------------------------------+-------------------+-----------------+---------------+------------+
|time|generation biomass|generation fossil brown coal/lignite|generation fossil coal-derived gas|generation fossil gas|generation fossil hard coal|generation fossil oil|generation fossil oil shale|generation fossil peat|generation geothermal|generation hydro pumped sto