In [27]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [28]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [29]:
from pyspark.sql.types import *
housing_schema = StructType([
    StructField('longitude', DoubleType(), True),
    StructField('latitude', DoubleType(), True),
    StructField('housing_median_age', DoubleType(), True),
    StructField('total_rooms', DoubleType(), True),
    StructField('total_bedrooms', DoubleType(), True),
    StructField('population', DoubleType(), True),
    StructField('households', DoubleType(), True),
    StructField('median_income', DoubleType(), True),
    StructField('median_house_value', DoubleType(), True),
    StructField('ocean_proximity', StringType(), True),
])

In [30]:
housing_df = spark.read.format('csv').options(header=True).load('file:///home/talentum/housing/housing.csv', schema=housing_schema)

In [31]:
housing_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

# Question 1 part a

In [32]:
avg = housing_df.groupBy('ocean_proximity').avg('median_house_value')

In [33]:
avg.show()

+---------------+-----------------------+
|ocean_proximity|avg(median_house_value)|
+---------------+-----------------------+
|         ISLAND|               380440.0|
|     NEAR OCEAN|     249433.97742663656|
|       NEAR BAY|     259212.31179039303|
|      <1H OCEAN|     240084.28546409807|
|         INLAND|     124805.39200122119|
+---------------+-----------------------+



# Question 1 part b

In [34]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 1. Initialize (using getOrCreate is safer)
spark = SparkSession.builder.getOrCreate()

# 2. Make sure the DataFrame variable name is correct!
# If your data is in 'df', use df. If it's in 'housing_df', use that.
housing_df.createOrReplaceTempView('housing_table')

# 3. Run the SQL
result_df = spark.sql("""
    SELECT 
        ocean_proximity, 
        AVG(median_house_value) AS average_value
    FROM 
        housing_table
    GROUP BY 
        ocean_proximity
""")

# 4. Show the results
result_df.show()

+---------------+------------------+
|ocean_proximity|     average_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



# Question 1 part C


In [35]:
pip install kafka-python

Collecting kafka-python
  Using cached https://files.pythonhosted.org/packages/4a/db/694fd552295ed091e7418d02b6268ee36092d4c93211136c448fe061fe32/kafka_python-2.3.0-py2.py3-none-any.whl
Installing collected packages: kafka-python
Successfully installed kafka-python-2.3.0
Note: you may need to restart the kernel to use updated packages.


In [36]:
import json
from kafka import KafkaProducer

# 1. Define the connection to the Kafka Broker
# Change 'localhost:9092' to your actual broker address
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 2. Prepare the new housing record
new_record = {
    "longitude": -122.23,
    "latitude": 37.88,
    "housing_median_age": 41.0,
    "total_rooms": 880.0,
    "total_bedrooms": 129.0,
    "population": 322.0,
    "households": 126.0,
    "median_income": 8.3252,
    "median_house_value": 452600.0,
    "ocean_proximity": "NEAR BAY"
}

# 3. Publish (send) the record to the topic 'housing_topic'
try:
    producer.send('housing_topic', value=new_record)
    
    # 4. Flush the producer to ensure the message is delivered
    producer.flush()
    print("Record published successfully!")
except Exception as e:
    print(f"Error publishing record: {e}")

Record published successfully!


# Question 1 part d

In [37]:
import json
from kafka import KafkaConsumer


consumer = KafkaConsumer(
    'housing_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    group_id='housing_debug_group',
    # Stop waiting if no message arrives for 5 seconds
    consumer_timeout_ms=5000, 
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None
)

print("Checking for messages (will timeout in 5 seconds if empty)...")

total_records = 0
try:
    for message in consumer:
        if message.value:
            total_records += 1
            print(f"Record {total_records}: {message.value.get('ocean_proximity')}")
except Exception as e:
    print(f"Error: {e}")

if total_records == 0:
    print("No messages found. Is the Producer running?")
else:
    print(f"Finished! Processed {total_records} total.")

Checking for messages (will timeout in 5 seconds if empty)...
Record 1: NEAR BAY
Record 2: NEAR BAY
Record 3: NEAR BAY
Finished! Processed 3 total.




# Question 1 part E

In [39]:
import json
import time
from kafka import KafkaProducer

# 1. Setup the Producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 2. Define 3 new records to insert
new_records = [
    {
        "longitude": -122.25, "latitude": 37.85, "housing_median_age": 52.0, 
        "total_rooms": 1274.0, "total_bedrooms": 235.0, "population": 558.0, 
        "households": 219.0, "median_income": 5.6431, "median_house_value": 341300.0, 
        "ocean_proximity": "NEAR BAY"
    },
    {
        "longitude": -118.35, "latitude": 34.05, "housing_median_age": 30.0, 
        "total_rooms": 2500.0, "total_bedrooms": 500.0, "population": 1200.0, 
        "households": 450.0, "median_income": 4.1500, "median_house_value": 250000.0, 
        "ocean_proximity": "<1H OCEAN"
    },
    {
        "longitude": -114.50, "latitude": 33.60, "housing_median_age": 15.0, 
        "total_rooms": 1500.0, "total_bedrooms": 300.0, "population": 800.0, 
        "households": 280.0, "median_income": 2.5000, "median_house_value": 100000.0, 
        "ocean_proximity": "INLAND"
    }
]

print("Starting Insertion...")

# 3. Loop through records and publish
for i, record in enumerate(new_records, 1):
    producer.send('housing_topic', value=record)
    
    # Force the message to be sent immediately
    producer.flush() 
    
    print(f"Iteration {i}: Record inserted successfully.")
    print(f"Current insertion count: {i}")
    
    # Small delay to simulate real-time stream
    time.sleep(1) 

print("\nAll 3 records have been published.")
producer.close()

Starting Insertion...
Iteration 1: Record inserted successfully.
Current insertion count: 1
Iteration 2: Record inserted successfully.
Current insertion count: 2
Iteration 3: Record inserted successfully.
Current insertion count: 3

All 3 records have been published.
