In [1]:
! pip install azure-storage-blob requests pandas

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.25.0-py3-none-any.whl.metadata (26 kB)
Collecting azure-core>=1.30.0 (from azure-storage-blob)
  Downloading azure_core-1.32.0-py3-none-any.whl.metadata (39 kB)
Collecting cryptography>=2.1.4 (from azure-storage-blob)
  Downloading cryptography-44.0.2-cp39-abi3-macosx_10_9_universal2.whl.metadata (5.7 kB)
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.7.2-py3-none-any.whl.metadata (11 kB)
Downloading azure_storage_blob-12.25.0-py3-none-any.whl (406 kB)
Downloading azure_core-1.32.0-py3-none-any.whl (198 kB)
Downloading cryptography-44.0.2-cp39-abi3-macosx_10_9_universal2.whl (6.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.7/6.7 MB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hDownloading isodate-0.7.2-py3-none-any.whl (22 kB)
Installing collected packages: isodate, cryptography, azure-core, azure-storage-blob
Successfully installed azure-core-1.32.0 azure

In [1]:
import os
import json
import time
from collections import defaultdict
from tqdm import tqdm
import hashlib
from datetime import datetime, timedelta
from tqdm import tqdm
import requests as req
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf, explode, col, array, lit, transform
from azure.storage.blob import BlobServiceClient
from hdfs import InsecureClient as HdfsClient
from io import BytesIO
from PIL import Image
from azure.storage.filedatalake import DataLakeServiceClient
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
connection_string = os.getenv('AZURE_CONNECTION_STRING')
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
spark = SparkSession.builder \
    .appName("BDMProject") \
    .master("spark://localhost:7077")\
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()
spark.sparkContext.setLogLevel("OFF")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/20 14:28:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Transportation

Find out more in: https://openrouteservice.org/dev/#/api-docs/optimization/post
Alternatives: https://github.com/graphhopper/graphhopper/blob/master/README.md#Map-Matching
https://github.com/VROOM-Project/vroom/blob/master/docs/API.md

In [5]:
from hdfs import InsecureClient

def test_spark_connection() -> bool:
    """
    Verifies the Spark session is active by performing a simple operation.
    Returns True if the session is responsive, False otherwise.
    """
    try:
        # Simple operation: create a small DataFrame and collect
        count = spark.range(0, 1).count()
        print (count == 1)
    except Exception:
        return print(False)

def test_hdfs_connection():
    try:
        hdfs_url = os.environ['HDFS_URL']
        client = InsecureClient(hdfs_url, user='hdfs')  # use appropriate user

        # List root directory
        print("Listing root directory:")
        print(client.list('/'))

        # Write a test file
        test_path = '/tmp/test_hdfs_connection.txt'
        test_content = 'HDFS connection successful!'
        client.write(test_path, data=test_content, overwrite=True)
        print(f"File written to {test_path}")

        # Read back the file
        with client.read(test_path, encoding='utf-8') as reader:
            content = reader.read()
            print("File content:")
            print(content)

        return True
    except Exception as e:
        print(f"❌ HDFS connection failed: {e}")
        return False

# Run test
if __name__ == '__main__':
    test_hdfs_connection()
    test_spark_connection()

Listing root directory:
['exploitation_zone', 'tmp']
File written to /tmp/test_hdfs_connection.txt
File content:
HDFS connection successful!
True


# Helper Functions

In [6]:
def build_struct(flat_schema: dict) -> StructType:
    fields = []
    for name, typ in flat_schema.items():
        if isinstance(typ, list):
            elem = typ[0]
            if isinstance(elem, dict):
                struct = build_struct(elem)
                fields.append(StructField(name, ArrayType(struct), True))
            else:
                spark_type = {'int': IntegerType(), 'float': DoubleType(), 'str': StringType(), 'bool': BooleanType()}.get(elem, StringType())
                fields.append(StructField(name, ArrayType(spark_type), True))
        else:
            spark_type = {'int': IntegerType(), 'float': DoubleType(), 'str': StringType(), 'bool': BooleanType()}.get(typ, StringType())
            fields.append(StructField(name, spark_type, True))
    return StructType(fields)

def string_to_sha256(text: str) -> str:
    return hashlib.sha256(text.encode('utf-8')).hexdigest()

def flatten_df(df):
    from pyspark.sql.types import StructType
    flat_cols = []
    nested_cols = []
    for field in df.schema.fields:
        if isinstance(field.dataType, StructType):
            nested_cols.append(field.name)
        else:
            flat_cols.append(col(field.name))
    for nested in nested_cols:
        for f in df.schema[nested].dataType.fields:
            flat_cols.append(col(f"{nested}.{f.name}").alias(f"{nested}_{f.name}"))
    return df.select(flat_cols)

# Configuración del Data Lake
# Asume que 'file_system_name' es el filesystem de Delta Lake
def get_data_lake_service(account_url: str, credential) -> DataLakeServiceClient:
    return DataLakeServiceClient(account_url=account_url, credential=credential)

@udf(StringType())
def sha256_udf(url: str) -> str:
    return hashlib.sha256(url.encode('utf-8')).hexdigest() if url else None

@udf(StringType())
def standardized_hours_udf(ts: str) -> str:
    return datetime.fromisoformat(ts).strftime('%H:%M') if ts else None


def file_exists(fs_client, path: str) -> bool:
    try:
        file_client = fs_client.get_file_client(path)
        return file_client.exists()
    except Exception:
        return False


def upload_file(fs_client, path: str, data: bytes, overwrite=True) -> None:
    file_client = fs_client.get_file_client(path)
    if overwrite and file_client.exists():
        file_client.delete_file()
    file_client.create_file()
    file_client.append_data(data, offset=0)
    file_client.flush_data(len(data))

# ---------------------------------------------------------------------
# FLATTEN, CAST, SCHEMA Y COMPRESIÓN DE IMÁGENES (sin cambios)
# ---------------------------------------------------------------------


def load_json_schema(path: str) -> dict:
    with open(path) as f:
        return json.load(f)

def compress_image(image_bytes: bytes, max_width: int = 1024, quality: int = 75) -> bytes:
    with Image.open(BytesIO(image_bytes)) as img:
        if img.mode in ("RGBA", "P"):
            img = img.convert("RGB")
        if img.width > max_width:
            ratio = max_width / float(img.width)
            new_height = int(img.height * ratio)
            img = img.resize((max_width, new_height), Image.LANCZOS)
        buffer = BytesIO()
        img.save(buffer, format="JPEG", quality=quality, optimize=True)
        return buffer.getvalue()

def process_accommodation_record(record: dict, schema: dict) -> dict:
    # Crear DF inferido
    accommodation_schema_dict = json.loads(open('accommodation_schema.json').read())
    accommodation_struct = build_struct(accommodation_schema_dict)
     # Crear DF completo e inferido
    df = spark.read.json(spark.sparkContext.parallelize([json.dumps(record)]))

    # Extraer columnas planas y hasta 3 niveles de anidación

    all_cols = []
    for field in df.schema.fields:
        if isinstance(field.dataType, StructType):
            for subfield in field.dataType.fields:
                if isinstance(subfield.dataType, StructType):
                    for subsubfield in subfield.dataType.fields:
                        if isinstance(subsubfield.dataType, StructType):
                            for subsubsubfield in subsubfield.dataType.fields:
                                all_cols.append(
                                    col(f"{field.name}.{subfield.name}.{subsubfield.name}.{subsubsubfield.name}").alias(f"{field.name}_{subfield.name}_{subsubfield.name}_{subsubsubfield.name}")
                                )
                        else:
                            all_cols.append(
                                col(f"{field.name}.{subfield.name}.{subsubfield.name}").alias(f"{field.name}_{subfield.name}_{subsubfield.name}")
                            )
                else:
                    all_cols.append(
                        col(f"{field.name}.{subfield.name}").alias(f"{field.name}_{subfield.name}")
                    )
        else:
            all_cols.append(col(field.name))

    df_flat = df.select(*all_cols)

    # Generar hashes de fotos en Python
    photo_urls = record.get("property", {}).get("photoUrls", [])
    photo_hashes = [string_to_sha256(u) for u in photo_urls]
    df_final = df_flat.withColumn("property_photoHash", lit(photo_hashes))

    # Serializar y validar con esquema oficial
    json_flat = df_final.toJSON().first()
    df_valid = spark.read.schema(accommodation_struct).json(spark.sparkContext.parallelize([json_flat]))
    return df_valid.collect()[0].asDict()

# --- Procesar registro de clima con Spark flatten y luego aplicar esquema ---
def process_weather_record(raw: dict, schema: dict) -> dict:
    weather_schema_dict = json.loads(open('weather_schema.json').read())
    weather_struct = build_struct(weather_schema_dict)
    raw_hourly = raw['hourly']
    raw_hourly['timestamp'] = raw_hourly['time']
    raw_hourly['time'] = [datetime.fromisoformat(t).strftime('%H:%M') for t in raw_hourly['time']]
    
    data_rows = [dict(zip(raw_hourly.keys(), values)) for values in zip(*raw_hourly.values())]

    # 4. Crear el DataFrame con esquema aplicado
    df_valid = spark.createDataFrame(data_rows, schema=weather_struct)
    all_rows = [row.asDict() for row in df_valid.collect()]

    return all_rows

# --- Procesado de imágenes sin cambios ---
def process_accommodation_images(photo_urls: list, fs_client, city: str) -> None:
    for url in photo_urls:
        sha = string_to_sha256(url)
        path = f"landing_zone/accommodation_images/{city}/{sha}.jpg"
        trusted_path = f"trusted_zone/accommodation_images/{city}/{sha}.jpg"
        if file_exists(fs_client, path) and file_exists(fs_client, trusted_path):
            continue
        if file_exists(fs_client, path):
            data = fs_client.get_file_client(path).download_file().readall()
        else:
            res = req.get(url, stream=True)
            try:
                res.raise_for_status()
            except:
                continue
            data = res.content
            upload_file(fs_client, path, data)
        compressed = compress_image(data, max_width=800, quality=70)
        upload_file(fs_client, trusted_path, compressed)

BACKFILL_LOG = "/exploitation_zone/backfilled_files.txt"

def _load_backfill_log(hdfs_client: InsecureClient):
    if hdfs_client.status(BACKFILL_LOG, strict=False):
        with hdfs_client.read(BACKFILL_LOG) as reader:
            return reader.read().decode('utf-8').splitlines()
    return []

def _append_to_backfill_log(hdfs_client: InsecureClient, log_entry: str):
    entries = _load_backfill_log(hdfs_client)
    if log_entry not in entries:
        entries.append(log_entry)
        hdfs_client.write(BACKFILL_LOG, "\n".join(entries).encode('utf-8'), overwrite=True)


def _load_parquet(spark, hdfs_client, data_type, city, filename):
    hdfs_path = f"/exploitation_zone/{data_type}/{city}/{filename}"
    
    if not hdfs_client.status(hdfs_path, strict=False):
        return None  # El archivo aún no existe

    # Copiar archivo HDFS → local temporal
    local_temp = f"/tmp/{filename}_read"
    hdfs_client.download(hdfs_path, local_temp, overwrite=True)

    # Leer el Parquet localmente
    return spark.read.parquet(local_temp)


def _write_parquet_to_hdfs(df, hdfs_client: InsecureClient, data_type: str, city: str, filename: str):
    local_temp = f"/tmp/{filename}"
    df.write.mode('append').parquet(local_temp)
    hdfs_target = f"/exploitation_zone/{data_type}/{city}/{filename}"
    hdfs_client.delete(hdfs_target, recursive=True)
    hdfs_client.upload(hdfs_target, local_temp, overwrite=True)
    log_entry = f"{data_type}/{city}/{filename}"
    _append_to_backfill_log(hdfs_client, log_entry)

def _read_json_df(dl_client, spark, path):
    """
    Download JSON from `path`, parse it, and return a Spark DataFrame.
    """
    data_bytes = dl_client.get_file_client(path).download_file().readall()
    docs = json.loads(data_bytes.decode('utf-8'))
    # Wrap single dict or list of dicts into a list
    records = docs if isinstance(docs, list) else [docs]
    json_rdd = spark.sparkContext.parallelize([json.dumps(record) for record in records])
    return spark.read.json(json_rdd)


def _process_parquet_backfill(
    dl_client, hdfs_client, spark,
    data_type, file_paths,
    data_file, log_entries
):
    """
    Load existing parquet per city (named by `data_file`), union with new JSON files, write once per city.
    """
    # Group file paths by city
    files_by_city = defaultdict(list)
    for full_path, city in file_paths:
        log_key = f"{data_type}/{city}/{full_path.split('/')[-1]}"
        if log_key not in log_entries:
            files_by_city[city].append(full_path)

    for city, paths in files_by_city.items():
        # Load existing data once
        existing = _load_parquet(spark, hdfs_client, data_type, city, data_file)
        new_dfs = []
        for path in tqdm(paths, desc=f"Loading {data_type} files in {city}"):
            df = _read_json_df(dl_client, spark, path)
            new_dfs.append(df)
        if not new_dfs:
            continue
        # Combine all new and existing
        combined = new_dfs[0] if existing is None else existing.unionByName(new_dfs[0], allowMissingColumns=True)
        for df in tqdm(new_dfs[1:], desc=f"Merging {data_type} files in {city}"):
            combined = combined.unionByName(df, allowMissingColumns=True)
        # Write back to HDFS
        _write_parquet_to_hdfs(combined, hdfs_client, data_type, city, data_file)
        # Append each file to log
        for path in paths:
            log_key = f"{data_type}/{city}/{path.split('/')[-1]}"
            _append_to_backfill_log(hdfs_client, log_key)

def get_and_sync_accommodation(
    fs,
    hdfs_client: InsecureClient,
    spark: SparkSession,
    start: datetime,
    end: datetime,
    cities: dict,
    query_template: dict,
    headers:dict,
    schema_file: str
):
    schema = load_json_schema(schema_file)
    delta = timedelta(days=1)

    for single_date in tqdm([start + i * delta for i in range((end - start).days + 1)]):
        arrival = single_date.strftime('%Y-%m-%d')
        departure = (single_date + delta).strftime('%Y-%m-%d')
        for city, dest_id in tqdm(cities.items()):
            landing_path = f"landing_zone/accommodation/{city}/{arrival}_{departure}.json"
            trusted_path = f"trusted_zone/accommodation/{city}/{arrival}_{departure}.json"

            # filtro: si existe en landing y trusted, nada que hacer
            if file_exists(fs, landing_path) and file_exists(fs, trusted_path):
                continue

            # obtener JSON: de landing o API
            if file_exists(fs, landing_path):
                raw = fs.get_file_client(landing_path).download_file().readall().decode('utf-8')
                data = json.loads(raw)
            else:
                params = dict(query_template, dest_id=dest_id, arrival_date=arrival, departure_date=departure)
                res = req.get(accommodation_endpoint, params=params, headers=headers)
                time.sleep(10)
                res.raise_for_status()
                data = res.json()
                upload_file(fs, landing_path, json.dumps(data).encode('utf-8'))
            # procesar imágenes
            photo_urls = [u for h in data['data']['hotels'] for u in h['property']['photoUrls']]
            process_accommodation_images(photo_urls, fs, city)
            # transformar y guardar registros en zona confiable
            docs = [process_accommodation_record(r, schema) for r in data['data']['hotels']]
            upload_file(fs, trusted_path, json.dumps(docs).encode('utf-8'))

            exploitation_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(d) for d in docs]))

            existing_df = _load_parquet(spark, hdfs_client, 'accommodation', city, 'AccommodationData.parquet')
            if existing_df is None:
                combined = exploitation_df
            else:
                combined = existing_df.unionByName(exploitation_df, allowMissingColumns=True).dropDuplicates()
            # write back
            _write_parquet_to_hdfs(combined, hdfs_client, 'accommodation', city, 'AccommodationData.parquet')

def get_and_sync_weather(
    fs,
    hdfs_client: InsecureClient,
    spark: SparkSession,
    start: datetime,
    end: datetime,
    coords: dict,
    query_template: dict,
    schema_file: str
):
    schema = load_json_schema(schema_file)
    delta = timedelta(days=1)

    for single_date in tqdm([start + i * delta for i in range((end - start).days + 1)]):
        prev_start = (single_date - timedelta(days=365)).strftime('%Y-%m-%d')
        for city, coord in coords.items():
            landing_path = f"landing_zone/weather/{city}/{prev_start}.json"
            trusted_path = f"trusted_zone/landing_zone/weather/{city}/{prev_start}.json"

            # filtro: si existe en landing y trusted, nada que hacer
            if file_exists(fs, landing_path) and file_exists(fs, trusted_path):
                continue

            # obtener JSON: de landing o API
            if file_exists(fs, landing_path):
                raw = fs.get_file_client(landing_path).download_file().readall().decode('utf-8')
                data = json.loads(raw)
            else:
                params = dict(query_template, latitude=coord['latitude'], longitude=coord['longitude'], start_date=prev_start, end_date=prev_start)
                res = req.get(weather_endpoint, params=params)
                time.sleep(10)
                res.raise_for_status()
                data = res.json()
                upload_file(fs, landing_path, json.dumps(data).encode('utf-8'))

            # transformar y guardar en zona confiable
            doc = process_weather_record(data, schema)
            upload_file(fs, trusted_path, json.dumps(doc).encode('utf-8'))

            exploitation_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(doc)]))

            existing_df = _load_parquet(spark, hdfs_client, 'weather', city, 'WeatherData.parquet')
            if existing_df is None:
                combined = exploitation_df
            else:
                combined = existing_df.unionByName(exploitation_df, allowMissingColumns=True).dropDuplicates()
            _write_parquet_to_hdfs(combined, hdfs_client, 'weather', city, 'WeatherData.parquet')

def list_files(fs_client, prefix: str) -> list:
    return [p.name for p in fs_client.get_paths(path=prefix) if not p.is_directory]


def find_missing_blobs(
    fs,
    landing_prefix: str
) -> list:
    missing = []
    for path in list_files(fs, 'landing_zone/' + landing_prefix):
        trusted_path = f"trusted_zone/{path.replace('landing_zone/', '', 1)}"
        if not file_exists(fs, trusted_path):
            missing.append(path)
    return missing


def backfill_trusted_zone(
    fs,
    landing_prefix: str,
    schema_file: str = None
) -> None:
    schema = load_json_schema(schema_file) if schema_file else None
    to_fill = find_missing_blobs(fs, landing_prefix)

    for landing_path in tqdm(to_fill):
        trusted_path = f"trusted_zone/{landing_path.replace('landing_zone/', '', 1)}"
        data_bytes = fs.get_file_client(landing_path).download_file().readall()

        # imágenes de alojamiento
        if landing_path.startswith('landing_zone/accommodation_images/'):
            compressed = compress_image(data_bytes, max_width=800, quality=70)
            upload_file(fs, trusted_path, compressed)
            continue

        # JSON de alojamiento
        if landing_path.startswith('landing_zone/accommodation/'):
            data = json.loads(data_bytes.decode('utf-8'))
            docs = [process_accommodation_record(r, schema) for r in data['data']['hotels']]
            upload_file(fs, trusted_path, json.dumps(docs).encode('utf-8'))
            continue

        # JSON de clima
        if landing_path.startswith('landing_zone/weather/'):
            data = json.loads(data_bytes.decode('utf-8'))
            doc = process_weather_record(data, schema)
            upload_file(fs, trusted_path, json.dumps(doc).encode('utf-8'))

def backfill_exploitation_zone(
    dl_client,
    hdfs_client: InsecureClient,
    spark: SparkSession,
    data_type: str,
    data_file: str = None
):
    # Load log entries once
    log_entries = _load_backfill_log(hdfs_client)
    # List all files under trusted_zone
    all_paths = list_files(dl_client, "trusted_zone/")
    # Filter relevant paths
    qualified = []  # tuples of (full_path, city)
    for path in all_paths:
        parts = path.split('/')
        d_type, city = parts[-3], parts[-2]
        if d_type != data_type:
            continue
        full_path = f"trusted_zone/{data_type}/{city}/{parts[-1]}"
        qualified.append((full_path, city))

    if data_type in ("accommodation", "weather"):
        _process_parquet_backfill(
            dl_client, hdfs_client, spark,
            data_type, qualified,
            data_file, log_entries
        )

    elif data_type == "accommodation_images":
        # Direct copy of images
        for full_path, city in tqdm(qualified, desc="Copying images"):
            file_name = full_path.split('/')[-1]
            log_key = f"{data_type}/{city}/{file_name}"
            if log_key in log_entries:
                continue
            data_bytes = dl_client.get_file_client(full_path).download_file().readall()
            hdfs_target = f"/exploitation_zone/accommodation_images/{city}/{file_name}"
            hdfs_client.write(hdfs_target, data_bytes, overwrite=True)
            _append_to_backfill_log(hdfs_client, log_key)


# Pipeline

In [7]:
destination_ids = {
    "Barcelona": "-372490",
    "Rome": "-126693",
    "Madrid": "-390625",
    "Paris": "-1456928"
}

destination_coords = {
    'Barcelona': {'latitude': 41.3874, 'longitude': 2.1686},
    'Paris': {'latitude': 48.8575, 'longitude': 2.3514},
    'Madrid': {'latitude': 40.4167, 'longitude': 3.7033},
    'Rome': {'latitude': 41.8967, 'longitude': 12.4822}
}

accommodation_endpoint = "https://booking-com15.p.rapidapi.com/api/v1/hotels/searchHotels"
weather_endpoint = 'https://archive-api.open-meteo.com/v1/archive'

headers = {
    "x-rapidapi-key": os.environ["RAPID_API_KEY"],
    "x-rapidapi-host": os.environ["RAPID_API_HOST"]
}

accommodation_query = {
    "dest_id": '',
    "search_type": "CITY",
    "arrival_date": '',
    "departure_date": '',
    "adults": "2",
    "children_age": "0",
    "room_qty": "1",
    "page_number": "1",
    "units": "metric",
    "temperature_unit": "c",
    "languagecode": "en-us",
    "currency_code": "EUR"
}

weather_metrics = 'temperature_2m,rain,snowfall,precipitation,cloud_cover,wind_speed_10m,sunshine_duration'

weather_query = {
    'latitude': '',
    'longitude': '',
    'hourly': weather_metrics,
    'start_date': '',
    'end_date': ''
}

In [8]:
# Initialize DataLakeServiceClient once
datalake_client = DataLakeServiceClient.from_connection_string(connection_string)
file_system_client = datalake_client.get_file_system_client(storage_container_name)
hdfs_service = HdfsClient(os.environ['HDFS_URL'])

In [65]:
# parameters
start = datetime.strptime('2025-06-29', '%Y-%m-%d')
end = datetime.strptime('2025-06-30', '%Y-%m-%d')
cities = list(destination_ids.keys())

# sync on the fly
get_and_sync_accommodation(file_system_client, hdfs_service, spark, start, end, destination_ids, accommodation_query, headers, 'accommodation_schema.json')
get_and_sync_weather(file_system_client, hdfs_service, spark, start, end, destination_coords, weather_query, 'weather_schema.json')

100%|██████████| 4/4 [00:00<00:00, 13.69it/s]
100%|██████████| 4/4 [00:00<00:00, 18.87it/s]
100%|██████████| 2/2 [00:00<00:00,  3.80it/s]
100%|██████████| 2/2 [01:43<00:00, 51.90s/it]                                   


In [None]:
backfill_trusted_zone(file_system_client, 'accommodation/', 'accommodation_schema.json')
backfill_trusted_zone(file_system_client, 'accommodation_images/')
backfill_trusted_zone(file_system_client, 'weather/', 'weather_schema.json')

0it [00:00, ?it/s]
0it [00:00, ?it/s]
0it [00:00, ?it/s]


In [9]:
backfill_exploitation_zone(file_system_client, hdfs_service, spark, 'accommodation', 'AccommodationData.parquet')
backfill_exploitation_zone(file_system_client, hdfs_service, spark, 'accommodation_images')
backfill_exploitation_zone(file_system_client, hdfs_service, spark, 'weather', 'WeatherData.parquet')

Copying images: 100%|██████████| 2048/2048 [00:00<00:00, 32475.39it/s]
Loading weather files in Barcelona: 100%|██████████| 101/101 [00:37<00:00,  2.71it/s]
Merging weather files in Barcelona: 100%|██████████| 100/100 [00:01<00:00, 55.39it/s]
Loading weather files in Madrid: 100%|██████████| 101/101 [00:29<00:00,  3.39it/s]
Merging weather files in Madrid: 100%|██████████| 100/100 [00:01<00:00, 66.00it/s]
Loading weather files in Paris: 100%|██████████| 101/101 [00:30<00:00,  3.32it/s]
Merging weather files in Paris: 100%|██████████| 100/100 [00:01<00:00, 65.75it/s]
Loading weather files in Rome: 100%|██████████| 101/101 [00:23<00:00,  4.34it/s]
Merging weather files in Rome: 100%|██████████| 100/100 [00:00<00:00, 107.83it/s]
                                                                                

In [22]:
data_type = 'weather'
city = 'Barcelona'
filename = 'WeatherData.parquet'
path = f"hdfs://localhost:9000/exploitation_zone/{data_type}/{city}/{filename}"
limit = 20
df = spark.read.parquet(path)
df.select('timestamp').distinct().count()/24

                                                                                

99.0