# Managing Big Data for Connected Devices

## 420-N63-NA

## Kawser Wazed Nafi
 ---------------------------------------------------------------------------------------------------------------------------------Singular Value Decompositionression

In this we'll make use of the California Housing data set.

The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area.

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

#### Longitude
refers to the angular distance of a geographic place north or south of the earth’s equator for each block group

#### Latitude 
refers to the angular distance of a geographic place east or west of the earth’s equator for each block group

#### Housing Median Age
is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values
#### Total Rooms
is the total number of rooms in the houses per block group

#### Total Bedrooms
is the total number of bedrooms in the houses per block group

#### Population
is the number of inhabitants of a block group

#### Households
refers to units of houses and their occupants per block group

#### Median Income
is used to register the median income of people that belong to a block group

#### Median House Value
is the dependent variable and refers to the median house value per block group

In [1]:
# Install dependent libraries
!pip install mllib
!pip install pandas
!pip install numpy



### Library Import

In [2]:
import os

import pandas as pd

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import StandardScaler

### Spark Session Creation with SparkSQL

In [3]:
# Spark Session Creation

ss = SparkSession.builder.master("local[4]").appName("Linear-Regression").getOrCreate()
sc = ss.sparkContext
sqlContext = SQLContext(sc)

# Load data from file into dataframe
HOUSING_DATA = 'input/cal_housing.data'
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)
# Load housing data
housing_df = ss.read.csv(path=HOUSING_DATA, schema=schema).cache()

# Inspect first five rows
housing_df.show(5)



+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|
|-122.25|37.85|  52.0|  1627.0|   280.0| 565.0|   259.0|3.8462|342200.0|
+-------+-----+------+--------+--------+------+--------+------+--------+
only showing top 5 rows



### Loaded Data View

In [4]:
# show the dataframe columns
housing_df.columns

# show the schema of the dataframe
housing_df.printSchema()

# run a sample selection
housing_df.select('pop','totbdrms').show(10)

# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)

result_df.show(10)

root
 |-- long: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- medage: float (nullable = true)
 |-- totrooms: float (nullable = true)
 |-- totbdrms: float (nullable = true)
 |-- pop: float (nullable = true)
 |-- houshlds: float (nullable = true)
 |-- medinc: float (nullable = true)
 |-- medhv: float (nullable = true)

+------+--------+
|   pop|totbdrms|
+------+--------+
| 322.0|   129.0|
|2401.0|  1106.0|
| 496.0|   190.0|
| 558.0|   235.0|
| 565.0|   280.0|
| 413.0|   213.0|
|1094.0|   489.0|
|1157.0|   687.0|
|1206.0|   665.0|
|1551.0|   707.0|
+------+--------+
only showing top 10 rows

+------+-----+
|medage|count|
+------+-----+
|  52.0| 1273|
|  51.0|   48|
|  50.0|  136|
|  49.0|  134|
|  48.0|  177|
|  47.0|  198|
|  46.0|  245|
|  45.0|  294|
|  44.0|  356|
|  43.0|  353|
+------+-----+
only showing top 10 rows



### Features Selection And Label Creation

In [5]:
# Separate features and target column
feature_columns = ["long", "lat", "medage", "totrooms", "totbdrms", "pop", "houshlds", "medinc"]
label_column = "medhv"

features_df = housing_df.select(*feature_columns)
label_df = housing_df.select(label_column)

### Spark Frame to to Pandas dataframe conversion

As it is hard to locate data and extract them in Spark Dataframe, we temporarily need to convert the Spark Dataframe to Pandas dataframe. So, we are going to take the values loaded to RDD and work in local memory.

In [6]:
# Convert Spark DataFrame to Pandas DataFrame for SVD
features_pd = features_df.toPandas()

# Normalize the features
scaler = StandardScaler()
features_scaled = scaler.fit_transform(features_pd)

### Apply SVD for Dimensonality Reduction

In [7]:
# Apply SVD for dimensionality reduction
n_components = 3  # Number of dimensions to reduce to
svd = TruncatedSVD(n_components=n_components)
reduced_features = svd.fit_transform(features_scaled)

### Convert Local Dataframe to Spark Dataframe

In [8]:
# Convert the reduced features back to a Spark DataFrame
reduced_columns = [f"Component_{i+1}" for i in range(n_components)]
reduced_df = pd.DataFrame(reduced_features, columns=reduced_columns)
reduced_spark_df = ss.createDataFrame(reduced_df)

### Add the label back to Spark Dataframe

In [None]:
# Add an index column to the reduced Spark DataFrame
reduced_spark_df = reduced_spark_df.withColumn("index", F.monotonically_increasing_id())

# Add an index column to the label DataFrame
label_df = label_df.withColumn("index", F.monotonically_increasing_id())

# Join the reduced features with the target column
final_spark_df = reduced_spark_df.join(label_df, on="index").drop("index")

# Show the reduced dataset
final_spark_df.show(5)

Py4JJavaError: An error occurred while calling o130.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 1 times, most recent failure: Lost task 1.0 in stage 9.0 (TID 9) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:701)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:745)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:698)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:663)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:639)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:585)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:543)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 34 more


Check the dataframe now. We started with 8 features column and reduced it to 3 columns only. This is call dimension reduction.

In advanced data analytics, using heatmap people find out the features coefficient and decide which features are going to be combine to create the reduced dataframe.

### Save new dataframe to csv file

In [10]:
# Define the output path for the CSV file
output_path = "reduced_dataset.csv"

# Save the DataFrame as a CSV file
final_spark_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

print(f"Final dataset saved to {output_path}")


Final dataset saved to reduced_dataset.csv


## Exercise 01
Can you find out which features are merged under each component from the given california housing dataset? Please list them carefully.

In [11]:
import pandas as pd

# Get feature names (original dataset features)
feature_names = ["long", "lat", "medage", "totrooms", "totbdrms", "pop", "houshlds", "medinc"]

# Create a DataFrame with SVD component loadings
components_df = pd.DataFrame(svd.components_, columns=feature_names, index=[f"Component_{i+1}" for i in range(n_components)])

# Display the component-feature matrix
print("Feature Contribution per Principal Component:")
print(components_df)

# Identify the top contributing features for each component
top_features_per_component = components_df.apply(lambda x: x.abs().nlargest(3).index.tolist(), axis=1)
print("\nTop 3 Features for Each Component:")
print(top_features_per_component)

Feature Contribution per Principal Component:
                 long       lat    medage  totrooms  totbdrms       pop  \
Component_1  0.075640 -0.073022 -0.218499  0.483771  0.490501  0.471968   
Component_2 -0.701256  0.701977  0.016027  0.074611  0.060715  0.026036   
Component_3 -0.055776  0.012536 -0.393864  0.093020 -0.117157 -0.116248   

             houshlds    medinc  
Component_1  0.491718  0.045144  
Component_2  0.063521 -0.035300  
Component_3 -0.109440  0.890917  

Top 3 Features for Each Component:
Component_1    [houshlds, totbdrms, totrooms]
Component_2             [lat, long, totrooms]
Component_3        [medinc, medage, totbdrms]
dtype: object


## Exercise 02
Can you list 4 points on why do we need to reduce the dataset, whenever required? Please explain the points properly with example, if needed

### ANSWER:

Reducing the dataset (e.g., using dimensionality reduction techniques like SVD or PCA) is crucial for better performance and interpretability.<br> 

#### Here are four main reasons:

1. Improves Computational Efficiency<br>
Reason: Large datasets require more memory and processing power, leading to slower computations.<br>
Example: In machine learning, training a model on 1 million features would be much slower than training on a reduced subset of 100 features.<br>
2. Removes Redundant and Irrelevant Features<br>
Reason: Some features may be highly correlated or provide little additional information.<br>
Example: In a real estate dataset, total_rooms and total_bedrooms are strongly correlated, so keeping both is unnecessary.<br>
3. Reduces Overfitting and Improves Model Generalization<br>
Reason: Too many features can cause overfitting, where a model learns noise instead of patterns.<br>
Example: In a spam detection model, if we use thousands of email metadata features, the model might learn dataset-specific noise rather than general spam indicators.<br>
4. Enhances Interpretability<br>
Reason: A smaller set of features makes it easier to analyze and explain results.<br>
Example: In finance, reducing hundreds of stock market indicators to 3 key components (e.g., "market trend", "volatility", "sector growth") makes insights more actionable.<br>
Conclusion<br>
Reducing the dataset boosts performance, removes unnecessary noise, prevents overfitting, and makes results easier to interpret.

## Exercise 03
You are working with insurance.csv file on which after encoding the string values you are supposed to get a list of features (more than 10). It is now a dataset with big dimension (more than 10). Can you please reduce the dimension of the dataset to 4 columns/features? Please print the state of before dimension reduction and after dimension reduction state of the dataset 

In [12]:
import pandas as pd
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import StandardScaler

# Load the dataset
df = pd.read_csv("insurance.csv")

# Encode categorical variables (if any)
df_encoded = pd.get_dummies(df, drop_first=True)

# Standardize the dataset
scaler = StandardScaler()
df_scaled = scaler.fit_transform(df_encoded)

# Print shape before dimensionality reduction
print(f"Before Dimension Reduction: {df_scaled.shape}")  # (rows, features)

# Apply SVD to reduce to 4 features
n_components = 4
svd = TruncatedSVD(n_components=n_components)
df_reduced = svd.fit_transform(df_scaled)

# Convert back to DataFrame
df_reduced = pd.DataFrame(df_reduced, columns=[f"Component_{i+1}" for i in range(n_components)])

# Print shape after dimensionality reduction
print(f"After Dimension Reduction: {df_reduced.shape}")  # (rows, 4)

# Display first 5 rows
df_reduced.head()

Before Dimension Reduction: (1338, 9)
After Dimension Reduction: (1338, 4)


Unnamed: 0,Component_1,Component_2,Component_3,Component_4
0,0.506193,-0.868697,1.712276,-2.086279
1,-0.449763,2.09701,-0.796116,-1.252142
2,-0.088781,1.755718,-0.62587,-0.164003
3,-0.594631,-1.672266,-1.292505,-0.760977
4,-1.317052,-0.880339,-1.33658,-0.505571
