<a href="https://colab.research.google.com/github/PreethiMachineMindsAcademy/Air-BNB-PricePrediction-Regression/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Welcome to Colab!

In [1]:
!pip install requests gradio



In [7]:
!pip install transformers torch accelerate gradio



In [5]:
!pip install gradio==3.50.2 requests --upgrade --force-reinstall

Collecting gradio==3.50.2
  Downloading gradio-3.50.2-py3-none-any.whl.metadata (17 kB)
Collecting requests
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio==3.50.2)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting altair<6.0,>=4.2.0 (from gradio==3.50.2)
  Downloading altair-5.5.0-py3-none-any.whl.metadata (11 kB)
Collecting fastapi (from gradio==3.50.2)
  Downloading fastapi-0.135.0-py3-none-any.whl.metadata (30 kB)
Collecting ffmpy (from gradio==3.50.2)
  Downloading ffmpy-1.0.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==0.6.1 (from gradio==3.50.2)
  Downloading gradio_client-0.6.1-py3-none-any.whl.metadata (7.1 kB)
Collecting httpx (from gradio==3.50.2)
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting huggingface-hub>=0.14.0 (from gradio==3.50.2)
  Downloading huggingface_hub-1.5.0-py3-none-any.whl.metadata (13 kB)
Collecting importlib-resources<7.0,>=1.

In [21]:
def generate_pyspark_code(source_table):
    return f"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, sum

spark = SparkSession.builder \
    .appName("DealerMonthlySales") \
    .getOrCreate()

# Read Iceberg table
df = spark.read.format("iceberg").load("{source_table}")

# Handle null values
df_clean = df.filter(col("price").isNotNull())

# Extract month from sale_date
df_transformed = df_clean.withColumn("month", month(col("sale_date")))

# Aggregate total sales per dealer per month
df_agg = df_transformed.groupBy("dealer_id", "month") \
    .agg(sum("price").alias("total_sales"))

# Write output in Iceberg format partitioned by month
df_agg.write.format("iceberg") \
    .mode("overwrite") \
    .partitionBy("month") \
    .save("dealer_monthly_sales")

spark.stop()
"""


def run_agent(source, target):

    # ----- PLANNER (LLM Reasoning) -----
    plan_prompt = f"""
You are an expert Data Architect.

Task:
Design an ETL pipeline.

Source Table:
{source}

Target Requirement:
{target}

Provide:
1. Extraction step
2. Transformation steps
3. Aggregation logic
4. Storage format
5. Partition strategy

Answer in clear numbered steps.
"""
    plan = generate_response(plan_prompt)

    # ----- GENERATOR (Deterministic Template) -----
    source_table_name = "nsc_sales"
    code = generate_pyspark_code(source_table_name)

    # ----- REVIEWER (Optional LLM Refinement) -----
    review_prompt = f"""
Review this PySpark code and suggest improvements for:
- Performance
- Scalability
- Iceberg best practices

Code:
{code}

Return only suggestions (not rewritten code).
"""
    review_notes = generate_response(review_prompt)

    print("==== ETL PLAN (LLM) ====\n")
    print(plan)

    print("\n==== GENERATED PYSPARK CODE (Deterministic) ====\n")
    print(code)

    print("\n==== REVIEW SUGGESTIONS (LLM) ====\n")
    print(review_notes)

In [23]:
source = """
nsc_sales
dealer_id (int)
model_id (int)
price (double)
sale_date (date)
"""

target = """
Create monthly dealer sales summary.
Aggregate total sales per dealer per month.
Store result in Iceberg format.
"""

run_agent(source, target)

==== ETL PLAN (LLM) ====

1. Extraction 2. Transformation 3. Aggregation logic 4. Iceberg 5. Partition strategy

==== GENERATED PYSPARK CODE (Deterministic) ====


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, month, sum

spark = SparkSession.builder     .appName("DealerMonthlySales")     .getOrCreate()

# Read Iceberg table
df = spark.read.format("iceberg").load("nsc_sales")

# Handle null values
df_clean = df.filter(col("price").isNotNull())

# Extract month from sale_date
df_transformed = df_clean.withColumn("month", month(col("sale_date")))

# Aggregate total sales per dealer per month
df_agg = df_transformed.groupBy("dealer_id", "month")     .agg(sum("price").alias("total_sales"))

# Write output in Iceberg format partitioned by month
df_agg.write.format("iceberg")     .mode("overwrite")     .partitionBy("month")     .save("dealer_monthly_sales")

spark.stop()


==== REVIEW SUGGESTIONS (LLM) ====

- Scalability and Iceberg best practices
