<a href="https://colab.research.google.com/github/Naveen-S6/Python_practice/blob/main/pyspark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## üßëüèº‚Äçüîß PySpark Set Up

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q delta-spark==3.0.0

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [3]:
import findspark
findspark.init()
from delta.tables import DeltaTable

In [4]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder
    .appName("Airbnb_Cleanup")
    .master("local[*]")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)


spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [7]:
# üßÆ Generate large Airbnb listings.json dataset (500+ records)
import json, random

# -----------------------------
# Configuration
# -----------------------------
num_records = 600  # you can adjust to 500, 1000, etc.

amenities_pool = [
    "Wifi", "Kitchen", "Washer", "Dryer", "TV", "Essentials", "Air conditioning",
    "Heating", "Pool", "Hot tub", "Balcony", "Garden", "Parking", "Fireplace",
    "Sea view", "Mountain view", "Pet friendly", "Gym", "Breakfast", "Workspace"
]

property_types = [
    "Studio Apartment", "Private Room", "Entire Home", "Cottage", "Villa",
    "Cabin", "Loft", "Guest Suite", "Bungalow", "Condo"
]

cities = ["Mumbai", "Bangalore", "Hyderabad", "Chennai", "Pune", "Delhi", "Goa"]

boolean_variants = [True, False, "true", "false", "Yes", "No", "yes", "no", "TRUE", "FALSE"]

# -----------------------------
# Data generation logic
# -----------------------------
data = []

for i in range(1, num_records + 1):
    record = {
        "id": 100 + i,
        "name": f"{random.choice(['Cozy', 'Modern', 'Luxury', 'Spacious', 'Budget'])} "
                f"{random.choice(property_types)} in {random.choice(cities)}",
        "amenities": random.sample(amenities_pool, random.randint(3, 8)),
        "has_parking": random.choice(boolean_variants),
        "is_superhost": random.choice(boolean_variants)
    }
    data.append(record)

# -----------------------------
# Write to JSON file (line-delimited)
# -----------------------------
file_path = "/content/listings.json"

with open(file_path, "w") as f:
    for row in data:
        json.dump(row, f)
        f.write("\n")

print(f"‚úÖ Generated {len(data)} Airbnb listings at {file_path}")

# Quick sanity check
!head -n 5 /content/listings.json


‚úÖ Generated 600 Airbnb listings at /content/listings.json
{"id": 101, "name": "Spacious Condo in Bangalore", "amenities": ["Balcony", "TV", "Gym", "Hot tub", "Pool", "Garden", "Parking", "Washer"], "has_parking": true, "is_superhost": "Yes"}
{"id": 102, "name": "Luxury Villa in Mumbai", "amenities": ["Essentials", "Mountain view", "Fireplace", "Dryer", "Parking", "Gym"], "has_parking": "Yes", "is_superhost": "Yes"}
{"id": 103, "name": "Cozy Loft in Chennai", "amenities": ["Pool", "Hot tub", "Breakfast", "Heating", "Dryer", "Balcony"], "has_parking": "FALSE", "is_superhost": "FALSE"}
{"id": 104, "name": "Cozy Bungalow in Mumbai", "amenities": ["Breakfast", "TV", "Washer"], "has_parking": false, "is_superhost": "Yes"}
{"id": 105, "name": "Budget Villa in Hyderabad", "amenities": ["Parking", "Breakfast", "Mountain view", "Workspace", "Washer", "Heating"], "has_parking": true, "is_superhost": "true"}



# ‚ùì Scenario Question: Airbnb ‚Äî Clean Listing Amenities (PySpark) [Easy]



---

## üóÇÔ∏è Scenario

You are working with raw Airbnb listing data ingested from multiple sources.  
Each listing contains details about the property and a nested list of amenities.  
The goal is to clean and normalize this data for easier analysis downstream.

The data is available as a JSON file (`listings.json`) in the **Bronze layer**, which now needs to be processed into a structured **Silver table**.

---

## üéØ Task

Perform the following transformations:

1. **Read** the input data from `listings.json` using Spark.  
2. **Explode** the `amenities` array so that each row contains a single amenity.  
3. **Normalize** boolean-like columns (e.g., `"true"`, `"false"`, `"yes"`, `"no"`) to proper `true`/`false` Spark booleans.  
4. **Rename** or select only the relevant columns for downstream use.  
5. **Write** the cleaned result as a **Delta table**

---

## üß© Assumptions

- Input file `listings.json` is already available in the `/content/` directory.  
- The `amenities` field may contain an array or a string representation of an array.  
- Boolean columns may contain a mix of lowercase/uppercase strings or actual booleans.  
- Only essential columns (`id`, `name`, `amenity`, and boolean columns) are required in the output.  
- If a column is missing or malformed, handle it gracefully (e.g., cast to null).

---

## üì¶ Deliverables

- **Output Location:** `/content/silver/listing_amenities`  

| **Output Format** | Delta |

| **Expected Columns** | `id`, `name`, `amenity`, `has_parking`, `is_superhost` |

---

## üß† Notes

- Use `pyspark.sql.functions.explode()` to expand the amenities array.  
- Use `F.col().cast("boolean")` or `F.when()` for type normalization.  
- Maintain clear naming consistency in column aliases.  
- After writing, the Delta table should be queryable from the `spark.read.format("delta")` API.

---



## üõ¢Ô∏èInput data

In [8]:
# üßÆ Load and preview the sample dataset
df = spark.read.json("/content/listings.json")
df.show(5, truncate=False)

+---------------------------------------------------------------+-----------+---+------------+---------------------------+
|amenities                                                      |has_parking|id |is_superhost|name                       |
+---------------------------------------------------------------+-----------+---+------------+---------------------------+
|[Balcony, TV, Gym, Hot tub, Pool, Garden, Parking, Washer]     |true       |101|Yes         |Spacious Condo in Bangalore|
|[Essentials, Mountain view, Fireplace, Dryer, Parking, Gym]    |Yes        |102|Yes         |Luxury Villa in Mumbai     |
|[Pool, Hot tub, Breakfast, Heating, Dryer, Balcony]            |FALSE      |103|FALSE       |Cozy Loft in Chennai       |
|[Breakfast, TV, Washer]                                        |false      |104|Yes         |Cozy Bungalow in Mumbai    |
|[Parking, Breakfast, Mountain view, Workspace, Washer, Heating]|true       |105|true        |Budget Villa in Hyderabad  |
+---------------

# üìù Your Solution

In [9]:
# ‚úçÔ∏è Your Solution Here

from pyspark.sql import functions as F

# Steps:
# 1. Read the JSON file
# 2. Explode the amenities
# 3. Normalize boolean-like fields and retrun the dataframe


In [10]:
# -------------------------------------------------------
# üß† Spark Practice Challenge #1 ‚Äî Airbnb Listing Cleanup
# -------------------------------------------------------

from pyspark.sql import functions as F

# 1Ô∏è‚É£ Read the raw JSON file
df = spark.read.json("/content/listings.json")

# 2Ô∏è‚É£ Explode the amenities array
df_exploded = df.withColumn("amenity", F.explode("amenities"))

# 3Ô∏è‚É£ Normalize boolean-like fields
# Define a helper function to convert messy boolean formats to true/false
def normalize_boolean(col):
    return (
        F.when(F.lower(F.col(col)).isin("true", "yes", "y", "1"), True)
         .when(F.lower(F.col(col)).isin("false", "no", "n", "0"), False)
         .otherwise(F.col(col).cast("boolean"))
    )

df_cleaned = (
    df_exploded
    .withColumn("has_parking", normalize_boolean("has_parking"))
    .withColumn("is_superhost", normalize_boolean("is_superhost"))
)

# 4Ô∏è‚É£ Select required columns
df_final = df_cleaned.select("id", "name", "amenity", "has_parking", "is_superhost")




In [11]:
df_final.show(5)

+---+--------------------+-------+-----------+------------+
| id|                name|amenity|has_parking|is_superhost|
+---+--------------------+-------+-----------+------------+
|101|Spacious Condo in...|Balcony|       true|        true|
|101|Spacious Condo in...|     TV|       true|        true|
|101|Spacious Condo in...|    Gym|       true|        true|
|101|Spacious Condo in...|Hot tub|       true|        true|
|101|Spacious Condo in...|   Pool|       true|        true|
+---+--------------------+-------+-----------+------------+
only showing top 5 rows

