In [2]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [1]:
try:
    spark.stop()
except Exception:
    pass

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Iceberg via REST")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.rest.type", "rest")
    .config("spark.sql.catalog.rest.uri", "http://iceberg-rest:8181")
    .config("spark.sql.catalog.rest.warehouse", "s3://lake/warehouse")
    .config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .config("spark.sql.catalog.rest.s3.endpoint", "http://minio:9000")
    .config("spark.sql.catalog.rest.s3.path-style-access", "true")
    .config("spark.sql.catalog.rest.s3.access-key-id", "admin")
    .config("spark.sql.catalog.rest.s3.secret-access-key", "admin123")
    .config("spark.sql.catalog.rest.s3.region", "us-east-1")
    .getOrCreate()
)

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 14:53:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/19 14:53:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
spark.sql("SHOW NAMESPACES IN rest").show(truncate=False)

+---------+
|namespace|
+---------+
|raw      |
|silver   |
+---------+



In [7]:
spark.sql("SHOW TABLES IN rest.raw").show(truncate=False)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|raw      |avito    |false      |
|raw      |mubawab  |false      |
+---------+---------+-----------+



In [8]:
# Load RAW table

In [9]:
# spark.sql("TRUNCATE TABLE rest.raw.mubawab")

In [10]:
spark.sql("SELECT COUNT(*) FROM rest.raw.mubawab").show()

+--------+
|count(1)|
+--------+
|      30|
+--------+



In [69]:
raw_mubawab = spark.table("rest.raw.mubawab") 

In [70]:
import pandas as pd

pd.set_option('display.max_colwidth', None)  # show full column text
pd.set_option('display.width', None)         # no wrapping
pd.set_option('display.max_rows', 10)        # adjust as you like

raw_mubawab.limit(1).toPandas()

Unnamed: 0,id,payload,ingest_ts
0,8245060,"{""id"": ""8245060"", ""url"": ""https://www.mubawab.ma/fr/a/8245060/appartement-val-fleuri"", ""error"": null, ""listing_type"": ""vente"", ""title"": ""Appartement val fleuri"", ""price"": 1100000, ""location_text"": ""Bel Air - Val fleuri à Tanger"", ""features_amenities_json"": ""[\""Jardin\"", \""Ascenseur\""]"", ""description_text"": ""Bonjour,Je mets en vente un appartement de 100 mètre carré composé de 2 chambres salonsalle de bain toilettescuisine à Val fleuri avec ascenseur vu sur jardin.Le prix est légèrement négociable."", ""features_main_json"": ""{\""Type de bien\"": \""Appartement\"", \""Etat\"": \""Bon état\"", \""Années\"": \""10-20 ans\"", \""Étage du bien\"": \""5ème\"", \""Type du sol\"": \""Carrelage\""}"", ""gallery_urls"": ""[\""https://www.mubawab-media.com/ad/8/245/060F/h/IMG_0736_81335284.avif\"", \""https://www.mubawab-media.com/ad/8/245/060F/h/IMG_0734_81335285.avif\""]"", ""agency_name"": null, ""agency_url"": null}",2025-11-07 17:16:00.004


In [71]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json, split, regexp_replace, trim, when, size

# Schema inside payload (based on your sample; extra fields are nullable by default)
payload_schema = StructType([
    StructField("id", StringType()),
    StructField("url", StringType()),
    StructField("error", StringType()),
    StructField("listing_type", StringType()),      # "location" | "vente" | etc.
    StructField("title", StringType()),
    StructField("price", DoubleType()),             # already numeric in your sample
    StructField("location_text", StringType()),     # e.g. "Samlalia à Marrakech"
    StructField("features_amenities_json", StringType()),  # JSON array as string
    StructField("description_text", StringType()),
    StructField("features_main_json", StringType()),       # JSON object as string
    StructField("gallery_urls", StringType()),             # JSON array as string
    StructField("agency_name", StringType()),
    StructField("agency_url", StringType()),
    # Some listings might have these; keep them if your scraper adds later:
    StructField("published_date", StringType(), True),
])

# Parse the outer payload
raw_mubawab = (
    raw_mubawab
    .withColumn("j", from_json(col("payload"), payload_schema))
    .select(
        col("id").alias("record_id"),
        col("ingest_ts"),
        col("j.*")
    )
)

In [72]:

pd.set_option('display.max_colwidth', 30)  
pd.set_option('display.width', 20)         
pd.set_option('display.max_rows', 15)  

raw_mubawab.limit(1).toPandas()

Unnamed: 0,record_id,ingest_ts,id,url,error,listing_type,title,price,location_text,features_amenities_json,description_text,features_main_json,gallery_urls,agency_name,agency_url,published_date
0,8226084,2025-11-07 20:50:20.003,8226084,https://www.mubawab.ma/fr/...,,location,"Appartement 2ch à louer, b...",8000.0,Bourgogne Ouest à Casablanca,"[""Garage"", ""Ascenseur"", ""M...",Serraj-Immobilier vous pro...,"{""Type de bien"": ""Appartem...","[""https://www.mubawab-medi...",Serraj Immobilier,https://www.mubawab.ma/fr/...,


## id

In [23]:
raw_mubawab.select("id", "record_id").filter(col("id") != col("record_id")).show(truncate=False)

+---+---------+
|id |record_id|
+---+---------+
+---+---------+



In [73]:
raw_mubawab = raw_mubawab.drop("record_id")

In [74]:
from pyspark.sql.functions import col, count


dupes = (
    raw_mubawab.groupBy("id")
      .agg(count("*").alias("count"))
      .filter(col("count") > 1)
      .orderBy(col("count").desc())
)

dupes.show(truncate=False)
print("Total duplicate IDs:", dupes.count())

                                                                                

+-------+-----+
|id     |count|
+-------+-----+
|8247849|2    |
|8247808|2    |
|8247873|2    |
|8247815|2    |
|8247565|2    |
|8247567|2    |
|8247757|2    |
|8247911|2    |
|8247315|2    |
|8247278|2    |
|8247318|2    |
|8247275|2    |
|8247159|2    |
|8247142|2    |
|8247150|2    |
|8247187|2    |
|8246777|2    |
|8246784|2    |
|8246780|2    |
|8247493|2    |
+-------+-----+
only showing top 20 rows





Total duplicate IDs: 132


                                                                                

In [75]:
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number

df = spark.table("rest.raw.avito")  # or rest.silver.avito / rest.raw.mubawab

# Window: group by id, order by most recent ingest_ts
w = Window.partitionBy("id").orderBy(col("ingest_ts").desc())

# Keep only first row (latest per id)
raw_mubawab = (
    raw_mubawab.withColumn("rn", row_number().over(w))
      .filter(col("rn") == 1)
      .drop("rn")
)

## url

In [76]:
from pyspark.sql.functions import col, trim

raw_mubawab = raw_mubawab.filter(
    (col("url").isNotNull()) &
    (trim(col("url")) != "")
)

## price

In [77]:
raw_mubawab.select("price").distinct().show(10, truncate=False)



+---------+
|price    |
+---------+
|300000.0 |
|330000.0 |
|495000.0 |
|4800.0   |
|21500.0  |
|2346000.0|
|1055000.0|
|995000.0 |
|10300.0  |
|90000.0  |
+---------+
only showing top 10 rows



                                                                                

In [78]:
from pyspark.sql.functions import when, col, lit

raw_mubawab = raw_mubawab.withColumn(
    "price",
    when((col("price") <= 0) | col("price").isNull(), lit(None)).otherwise(col("price"))
)

## seller

In [79]:
from pyspark.sql.functions import col, count

raw_mubawab.groupBy("agency_name") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .show(truncate=False)



+----------------------------------+-----+
|agency_name                       |count|
+----------------------------------+-----+
|NULL                              |2116 |
|Yakeey                            |162  |
|Capital Foncier                   |64   |
|Serraj Immobilier                 |61   |
|TERRE ET MER IMMOBILIER           |47   |
|Les Tailleurs de l'Immobilier     |39   |
|Agence Transaction Foncière       |38   |
|8th Avenue                        |37   |
|EM CONSULT                        |33   |
|Kay Realestate                    |32   |
|Iziloge immobilier                |31   |
|Global Brokerage Real Estate      |30   |
|GUY HOQUET L'IMMOBILIER CASABLANCA|28   |
|Côté foncier                      |27   |
|Cosy Nest immobilier              |27   |
|Notorious                         |26   |
|Castle Agency                     |26   |
|MILKIYA                           |26   |
|Univers foncier                   |25   |
|FACILYKEY                         |25   |
+----------

                                                                                

In [80]:
from pyspark.sql.functions import col, lower, trim, when

raw_mubawab = (
    raw_mubawab
    .withColumn(
        "seller",
        when(
            (col("agency_name").isNull()) |
            (trim(col("agency_name")) == "") |
            (lower(trim(col("agency_name"))).isin("nan", "null", "unknown")),
            "unknown"
        ).otherwise(lower(trim(col("agency_name"))))
    )
    .drop("agency_name")
)

## images 

In [81]:
from pyspark.sql.functions import split, trim, col, from_json, expr
from pyspark.sql.types import ArrayType, StringType


# --- MUBAWAB ---
raw_mubawab = (
    raw_mubawab
    .withColumn(
        "images",
        from_json(col("gallery_urls"), ArrayType(StringType()))
    )
    .drop("gallery_urls")
)

In [21]:
raw_mubawab.select("images").distinct().show(1, truncate=False)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|images                                                                                                                                                                                                                                       

                                                                                

## equipments

In [35]:
raw_mubawab.groupBy("features_amenities_json") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .show(10, truncate=False)



+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features_amenities_json                                                                                                                                                                                                                                                                                                             |count|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|

                                                                                

## offre 

In [82]:
raw_mubawab.groupBy("listing_type") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .show(50, truncate=False)



+------------+-----+
|listing_type|count|
+------------+-----+
|location    |3300 |
|vente       |2938 |
+------------+-----+



                                                                                

In [83]:
from pyspark.sql.functions import col

raw_mubawab = raw_mubawab.withColumnRenamed("listing_type", "offre")

## city & nighbrhood

In [39]:
raw_mubawab.groupBy("location_text") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .show(5, truncate=False)

+----------------------------+-----+
|location_text               |count|
+----------------------------+-----+
|Guéliz à Marrakech          |10   |
|Route Casablanca à Marrakech|7    |
|Oulfa à Casablanca          |6    |
|Dar Bouazza                 |5    |
|Sania à Tanger              |5    |
+----------------------------+-----+
only showing top 5 rows



In [84]:
from pyspark.sql.functions import split, trim, when, col

# Split "location_text" into parts by "à"
split_col = split(col("location_text"), " à ")

# Create new columns
raw_mubawab = (
    raw_mubawab
    .withColumn("neighborhood", trim(split_col.getItem(0)))  # text before "à"
    .withColumn("city", trim(split_col.getItem(1)))           # text after "à"
)

# Optional: handle cases where "à" doesn't exist (e.g. only city name)
raw_mubawab = raw_mubawab.withColumn(
    "city",
    when(col("city").isNull(), col("neighborhood")).otherwise(col("city"))
)

# Verify results
raw_mubawab.select("location_text", "neighborhood", "city").show(10, truncate=False)



+----------------------------+----------------+-----------+
|location_text               |neighborhood    |city       |
+----------------------------+----------------+-----------+
|Asilah                      |Asilah          |Asilah     |
|Agdal à Rabat               |Agdal           |Rabat      |
|Maghrib Arabi à Kénitra     |Maghrib Arabi   |Kénitra    |
|Les Hôpitaux à Casablanca   |Les Hôpitaux    |Casablanca |
|Riyad à Rabat               |Riyad           |Rabat      |
|Oulfa à Casablanca          |Oulfa           |Casablanca |
|Dar Bouazza                 |Dar Bouazza     |Dar Bouazza|
|Route Casablanca à Marrakech|Route Casablanca|Marrakech  |
|Hay Mabrouka à Marrakech    |Hay Mabrouka    |Marrakech  |
|Maghrib Arabi à Kénitra     |Maghrib Arabi   |Kénitra    |
+----------------------------+----------------+-----------+
only showing top 10 rows



                                                                                

In [85]:
raw_mubawab = raw_mubawab.drop("location_text")

In [86]:
from pyspark.sql.functions import lit

raw_mubawab = raw_mubawab.withColumn("site", lit("mubawab"))

In [87]:
raw_mubawab.select("site").show(10, truncate=False)



+-------+
|site   |
+-------+
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
|mubawab|
+-------+
only showing top 10 rows



                                                                                

In [45]:
raw_mubawab.select("features_main_json").show(10, truncate=False)

[Stage 86:>                                                        (0 + 6) / 10]

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_main_json                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Type de bien": "Appartement", "Etat": "Bon état"}                                                                                                    |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Étage du bien": "3ème", "Orientation": "Est", "Type du sol": "Carrelage"}                         |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Années": "20-30 ans", "Étage du bien": "3ème"}                                                    |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Années": "10-20 ans", 

                                                                                

In [88]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import MapType, StringType

# Parse the JSON column into a map (key → value)
raw_mubawab = raw_mubawab.withColumn(
    "features_map",
    from_json(col("features_main_json"), MapType(StringType(), StringType()))
)

# Extract the value of "Type de bien"
raw_mubawab = raw_mubawab.withColumn(
    "property_type",
    col("features_map")["Type de bien"]
)

# Optional: drop the intermediate parsed map if you don’t need it
raw_mubawab = raw_mubawab.drop("features_map")

# Preview
raw_mubawab.select("features_main_json", "property_type").show(5, truncate=False)

                                                                                

+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|features_main_json                                                                                                                                   |property_type|
+-----------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|{"Type de bien": "Appartement", "Etat": "Nouveau", "Années": "Moins d'un an"}                                                                        |Appartement  |
|{"Type de bien": "Appartement", "Etat": "À rénover", "Étage du bien": "4ème"}                                                                        |Appartement  |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Années": "10-20 ans", "Étage du bien": "3ème", "Orientation": "Sud", "Type du sol": "Carrelage"}|Appartement  |
|{"T

In [51]:
raw_mubawab.select("property_type").show(10, truncate=False)

+-------------+
|property_type|
+-------------+
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
|Appartement  |
+-------------+
only showing top 10 rows



In [53]:
raw_mubawab.select("features_main_json",).show(10, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_main_json                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Type de bien": "Appartement", "Etat": "Bon état"}                                                                                                    |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Étage du bien": "3ème", "Orientation": "Est", "Type du sol": "Carrelage"}                         |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Années": "20-30 ans", "Étage du bien": "3ème"}                                                    |
|{"Type de bien": "Appartement", "Etat": "Bon état", "Années": "10-20 ans", 

In [89]:
from pyspark.sql.functions import from_json, col, trim
from pyspark.sql.types import MapType, StringType
import unicodedata, re

# 1) Parse JSON → Map(String,String)
raw_mubawab = raw_mubawab.withColumn(
    "feat_map",
    from_json(col("features_main_json"), MapType(StringType(), StringType()))
)

# 2) Collect ALL distinct keys present in the map
keys_df = raw_mubawab.selectExpr("explode(map_keys(feat_map)) as k").distinct()
keys = [r["k"] for r in keys_df.collect() if r["k"] is not None]

# 3) Helper to sanitize keys into safe column names
def sanitize(name: str) -> str:
    # remove accents
    name = unicodedata.normalize("NFKD", name).encode("ascii", "ignore").decode("ascii")
    # to snake_case
    name = re.sub(r"[^0-9a-zA-Z]+", "_", name).strip("_")
    return name.lower()

# 4) Create one column per key (values kept exactly as-is)
for k in keys:
    raw_mubawab = raw_mubawab.withColumn(sanitize(k), trim(col("feat_map")[k]))

# 5) Optional: drop the intermediate map and/or original JSON column
raw_mubawab = raw_mubawab.drop("feat_map")  # keep features_main_json if you still need the raw string
# raw_moteur = raw_moteur.drop("features_main_json")  # uncomment if you want to drop the raw JSON

# (Optional) quick peek
# raw_moteur.select("features_main_json", *[sanitize(k) for k in keys]).show(20, truncate=False)


                                                                                

In [56]:
raw_mubawab.limit(1).toPandas()

Unnamed: 0,ingest_ts,id,url,error,offre,title,price,description_text,features_main_json,agency_url,...,neighborhood,city,site,property_type,type_de_bien,type_du_sol,etage_du_bien,annees,orientation,etat
0,2025-11-07 17:09:50.015,8246493,https://www.mubawab.ma/fr/...,,vente,Studio à Vendre à Guéliz,1320000.0,Élégance et confort au cœu...,"{""Type de bien"": ""Appartem...",https://www.mubawab.ma/fr/...,...,Guéliz,Marrakech,mubawab,Appartement,Appartement,,,,,Bon état


In [90]:
raw_mubawab = raw_mubawab.drop("features_main_json")

In [91]:
raw_mubawab.printSchema()

root
 |-- ingest_ts: timestamp (nullable = true)
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- error: string (nullable = true)
 |-- offre: string (nullable = true)
 |-- title: string (nullable = true)
 |-- price: double (nullable = true)
 |-- features_amenities_json: string (nullable = true)
 |-- description_text: string (nullable = true)
 |-- agency_url: string (nullable = true)
 |-- published_date: string (nullable = true)
 |-- seller: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- neighborhood: string (nullable = true)
 |-- city: string (nullable = true)
 |-- site: string (nullable = false)
 |-- property_type: string (nullable = true)
 |-- type_de_bien: string (nullable = true)
 |-- surface_de_la_parcelle: string (nullable = true)
 |-- type_du_sol: string (nullable = true)
 |-- etage_du_bien: string (nullable = true)
 |-- annees: string (nullable = true)
 |-- orientation: string (nullable