In [10]:
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import os
import pandas as pd
from pyspark.sql.functions import regexp_replace, count, avg, when, concat, lit, regexp_extract, udf
import requests

In [3]:
# Initialize Spark session with the specified configuration settings
spark = SparkSession.builder \
    .appName("CatastroDataImport") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "12g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/20 21:40:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Folder where the Excel file is located
catastro_data_path = "/Users/maxheilingbrunner/Documents/solar-energy-spatial-prediction/data/cadastral/cadastral_malaga.xlsx"
sheet_name = 'detalle consulta'

# Read data from Excel file into a Pandas DataFrame
pandas_df = pd.read_excel(catastro_data_path, sheet_name=sheet_name)

# Convert the Pandas DataFrame to a Spark DataFrame
catastro_df = spark.createDataFrame(pandas_df)

# Performing the groupBy operation
temp = catastro_df.groupBy("Uso (USO)") \
    .agg(
        count("Superficie construida metros cuadrados (SUP)").alias("Count"),
        avg("Superficie construida metros cuadrados (SUP)").alias("Average_SUP"),
        avg("Coeficiente de participacion (CPA)").alias("Average_CPA")
    )

# Showing the result
temp.show()

[Stage 0:>                                                        (0 + 10) / 10]

+---------------+-----+------------------+-----------------+
|      Uso (USO)|Count|       Average_SUP|      Average_CPA|
+---------------+-----+------------------+-----------------+
| Almacén,Estac.|15488|27.154378744834705|51.69298245614035|
|       Cultural|  169| 32.03717159763314|            100.0|
| Edif. Singular|   33|57.102181818181805|            100.0|
|Ocio,Hostelería|  102|114.41749019607842|97.56756756756756|
|        Agrario|    8|         13.226875|            100.0|
|       Oficinas| 1398|250.98573605150207|74.60408163265306|
|Suelo sin edif.|   17|262.33935294117646|            100.0|
|   Espectáculos|   17| 413.9101176470588|            100.0|
|     Industrial| 3172|307.96646815888977|99.38726158038148|
|      Religioso|   21|223.35880952380953|            100.0|
|      Deportivo|  101|161.08853465346536| 98.9010989010989|
|      Comercial| 5804| 93.16804927636112|68.49095607235142|
|    Residencial|46934|102.16753300379258|66.82550518842163|
|Sanidad,Benefic|   57|1

                                                                                

In [15]:
# Folder where the CSV files are located
consumption_data_path = "/Users/maxheilingbrunner/Documents/solar-energy-spatial-prediction/data/consumption"

# Define the range of months
start_month = datetime.strptime("202203", "%Y%m")
end_month = datetime.strptime("202302", "%Y%m")

# Initialize an empty DataFrame to store final concatenated data
malaga_df = None

# Loop through each month in the range
current_month = start_month
while current_month.year < end_month.year or (current_month.year == end_month.year and current_month.month <= end_month.month):
    # Construct file names
    file_suffix = current_month.strftime("%Y%m")
    file_electricity = os.path.join(consumption_data_path, f"{file_suffix}_ELECTRICIDAD_MALAGA.csv")
    
    # Read the CSV files into Spark DataFrames
    if os.path.exists(file_electricity):
        electricity_df = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv(file_electricity)
        
        # Concatenate the individual DataFrame with the final DataFrame
        if malaga_df is None:
            malaga_df = electricity_df
        else:
            malaga_df = malaga_df.union(electricity_df)
        
    else:
        print(f"The files for {file_suffix} were not found in the folder '{consumption_data_path}'.")

    # Move to the next month
    month, year = (current_month.month + 1, current_month.year) if current_month.month < 12 else (1, current_month.year + 1)
    current_month = datetime(year, month, 1)

# Now final_df contains the concatenated data. You can perform further operations on it.
if malaga_df:
    malaga_df.show()
else:
    print("No data to process.")

23/06/20 21:50:22 ERROR Executor: Exception in task 79.0 in stage 17.0 (TID 492)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3633)
	at com.univocity.parsers.common.input.ExpandingCharAppender.expand(ExpandingCharAppender.java:115)
	at com.univocity.parsers.common.input.ExpandingCharAppender.expand(ExpandingCharAppender.java:119)
	at com.univocity.parsers.common.input.ExpandingCharAppender.expandAndRetry(ExpandingCharAppender.java:107)
	at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:179)
	at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:180)
	at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:180)
	at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:180)
	at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppen

ConnectionRefusedError: [Errno 61] Connection refused

In [11]:
# Assuming your PySpark DataFrame is named df and the column you want to modify is named 'Address'
catastro_df = catastro_df.withColumn('Localizacion de la finca (LFI)', regexp_replace(catastro_df['Localizacion de la finca (LFI)'], 'CL ', 'Calle '))
catastro_df = catastro_df.withColumn('Localizacion de la finca (LFI)', regexp_replace(catastro_df['Localizacion de la finca (LFI)'], 'AV ', 'Avenida '))
catastro_df = catastro_df.withColumn('Localizacion de la finca (LFI)', regexp_replace(catastro_df['Localizacion de la finca (LFI)'], 'PZ ', 'Plaza '))


street_names_df = catastro_df.withColumn('street_name', 
                                                     when((catastro_df["Localizacion de la finca (LFI)"].like("%Calle VELAZQUEZ%")) | 
                                                          (catastro_df["Localizacion de la finca (LFI)"].like("%Avenida EUROPA%")), 
                                                          catastro_df["Localizacion de la finca (LFI)"])
                                                     .otherwise(concat(regexp_extract(catastro_df["Localizacion de la finca (LFI)"], r"^(.*?)\s*\d+", 1), lit(', Malaga, España'))))

unique_street_names_df = street_names_df.select("street_name").distinct()

api_key = "AIzaSyComKjb-L8oELSH9TIhMij6Tv-bgwh1IH0"
def get_postal_code(address):
    url = f"https://maps.googleapis.com/maps/api/geocode/json?address={address}&key={api_key}"
    response = requests.get(url)
    data = response.json()
    if data["results"]:
        result = data["results"][0]
        for component in result["address_components"]:
            if "postal_code" in component["types"]:
                return component["long_name"]
    return None

get_postal_code_udf = udf(get_postal_code)

street_names_with_zip_df = unique_street_names_df.withColumn("postal_code", get_postal_code_udf(unique_street_names_df["street_name"]))

In [14]:
catastro_df.show()

+--------------------+-------------------------+-----------------------+----------------------+------------------------------+----------------------------+-----------------------+----------------------------------+------------------+----------+--------------------------------------------+-----------------------------------+--------------+------------+------------+--------------------------+------------+
|            consulta|Referencia Catastral (RC)|Codigo provincia (PROV)|Codigo municipio (MUN)|Localizacion de la finca (LFI)|Superficie construida (SUCF)|Superficie suelo (SUPF)|Coeficiente de participacion (CPA)|Tipo de Bien (TIP)| Uso (USO)|Superficie construida metros cuadrados (SUP)|Uso del elemento constructivo (UEC)|Escalera (ESC)|Planta (PLA)|Puerta (PUE)|Superficie catastral (SEC)|UEC analisis|
+--------------------+-------------------------+-----------------------+----------------------+------------------------------+----------------------------+-----------------------+-------

In [12]:
spark.stop()