
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

***Assignment 3***

*Uploading the data*

In [None]:
git add your_file.sql
git commit -m "Add SQL file"
git push

In [None]:
# File location and type
file_location = "/FileStore/tables/iot_devices.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.show(10)

+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|           cn|device_id|         device_name|humidity|             ip|latitude|   lcd|longitude|  scale|temp|    timestamp|
+-------------+---------+----+----+-------------+---------+--------------------+--------+---------------+--------+------+---------+-------+----+-------------+
|            8|      868|  US| USA|United States|        1|meter-gauge-1xbYRYcj|      51|   68.161.225.1|    38.0| green|    -97.0|Celsius|  34|1458444054093|
|            7|     1473|  NO| NOR|       Norway|        2|   sensor-pad-2n2Pea|      70|  213.161.254.1|   62.47|   red|     6.15|Celsius|  11|1458444054119|
|            2|     1556|  IT| ITA|        Italy|        3| device-mac-36TWSKiT|      44|      88.36.5.1|   42.83|   red|    12.83|Celsius|  19|1458444054120|
|            6|     1080|  US| USA|United Stat

***Question (1)***

***Definitions:*** 

**Resilient Distributed Datasets (RDDs):** RDDs are the foundational distributed data structure in Spark, representing immutable, fault-tolerant collections that can be processed in parallel.

**Dataframes:** DataFrames build on RDDs by adding a schema to each collection, making the data accessible in a structured, tabular format similar to a database table.

**Datasets:** Datasets are a hybrid between RDDs and DataFrames, offering both schema enforcement and compile-time type safety.


**RDDs:**

*Key Features:*

-	Control and Flexibility: RDDs provide full control over data distribution and low-level operations.

-	Lazy Evaluation: Transformations on RDDs are not executed until an action (like count() or collect()) is invoked, which helps optimize execution plans.

-	No Schema: They do not have a predefined schema, making them versatile but more challenging to optimize.

*Performance Considerations:*

Due to their lack of schema and optimizations, RDDs are less efficient than DataFrames or Datasets.

**Ideal Use Case:** Suitable for unstructured data or complex transformations where fine-grained control over data is necessary.


**DataFrames:**

*Key Features:*

-	Schema-Based: DataFrames support structured data with predefined schemas, which enables SQL-like operations and integration with Spark SQL.

-	Optimization: The Catalyst Optimizer automatically optimizes DataFrame queries, and the Tungsten engine further improves performance by managing memory more efficiently.

*Performance Considerations:*

DataFrames are significantly faster than RDDs because of these optimizations.

**Ideal Use Case:** DataFrames are ideal for ETL processes and SQL-like operations on structured data.


**Datasets:**

*Key Features:*

-	Type Safety: Datasets offer compile-time type checking, which helps catch errors early in development.

-	Optimized Execution: Datasets benefit from Spark SQL’s optimizations while providing a typed API.

-	Flexible Transformations: Datasets allow functional programming transformations (e.g., map, filter) similar to RDDs, but with the structure and optimization benefits of DataFrames.

*Performance Considerations:*

Datasets perform similarly to DataFrames, although heavy object manipulation can sometimes reduce optimization efficiency.

**Ideal Use Case:** Datasets are recommended when type safety and schema structure are needed, especially with semi-structured data.


***Differences:***

**Schema:**

*RDDs:* Do not have a schema; they are simply collections of objects.

*DataFrames:* Have a schema, which organizes data into named columns (like a table in SQL).

*Datasets:* Also have a schema and combine schema-based structure with type safety.

**Type Safety:**

*RDDs:* Type-safe, meaning types are checked at compile time.

*DataFrames:* Not type-safe; types are inferred at runtime.

*Datasets:* Type-safe, offering compile-time type checks like RDDs.

**Optimization:**

*RDDs:* No built-in optimizations; each operation is performed as-is, resulting in higher processing time.

*DataFrames:* Optimized by Spark’s Catalyst Optimizer and Tungsten engine, enhancing query performance.

*Datasets:* Optimized similarly to DataFrames through the Catalyst and Tungsten engines.

**API Level:**

*RDDs:* Low-level API, requiring explicit definitions for operations and transformations.

*DataFrames:* High-level API, making data manipulation more concise and SQL-like.

*Datasets:* High-level API, combining DataFrame ease with RDD-like functional programming.

**Data Type Flexibility:**

*RDDs:* Suitable for unstructured data or complex data types.

*DataFrames:* Best for structured data, typically used with tabular data.

*Datasets:* Suitable for both structured and semi-structured data, allowing more flexible data representations.

**Use Cases:**

*RDDs:* Ideal for low-level operations where full control is needed, often for unstructured or complex data transformations.

*DataFrames:* Suited for SQL-like operations and structured data analysis.

*Datasets:* Ideal when schema and type safety are required together, typically for semi-structured or structured data that benefits from functional transformations.


***Summary of Differences***:

-	RDDs: Low-level, fault-tolerant, no schema, slower due to fewer optimizations.

-	DataFrames: High-level, schema-based, optimized by Catalyst and Tungsten engines.

-	Datasets: Combine type safety and schema-based optimization, bridging the gap between RDDs and DataFrames.


In Spark 2.0, DataFrames and Datasets were unified under the Dataset API, simplifying the Spark API and making it easier for developers to work with a single high-level API with type safety.


***Question (2)***

***Question (2.1):*** 

How many sensor pads are reported to be from Poland?

Several ways to approah this question are presented:

In [None]:
# Create a view or table

temp_table_name = "iot_devices_json"

df.createOrReplaceTempView(temp_table_name)
df.count()

In [None]:
%sql

/* Query the created temp table in a SQL cell */

select * from `iot_devices_json` where cn = 'Poland' and device_name like 'sensor-pad%'

In [None]:

permanent_table_name = "iot_devices_json"


In [None]:
'''
I chose to show ways to answer this question via the use of filter() and where().

'''

from pyspark.sql.functions import col, asc, count, when
TempFilter1 = df.filter(col("cn") == "Poland").filter(col("device_name").like("sensor-pad%"))
sensor_poland = TempFilter1.count()

print(f"Number of sensor pads from Poland: {sensor_poland}")

df.where((col('cn') == "Poland") & (col('device_name').like("sensor-pad%"))).count()




***Question (2.2):***

How many different LCDs (distinct colors) are present in the dataset?

In [None]:
df.select("lcd").distinct().show()
distinct_color_count = df.select("lcd").distinct().count()
print(f"Number of distinct LCD colors: {distinct_color_count}")

***Question (2.3):***

Finding 5 countries that have the largest number of MAC devices used.

In [None]:
mac_devices_df = df.filter(col("device_name").like("device-mac%"))

# Group by country ("cn") and count the number of MAC devices
country_mac_counts = mac_devices_df.groupBy("cn").agg(count("device_name").alias("mac_device_count"))

# Order by the count in descending order and select the top 5 countries
top_countries = country_mac_counts.orderBy("mac_device_count", ascending=False).limit(5)

# Show the results
top_countries.show()

***Question (2.4):***

Proposing and trying a statistical test or machine learning model to gain insight from this dataset.

Here are a few ideas for data analysis of this dataset. 

I chose to use the numerical columns to run basic clustering by dendrogram, correlation using the Pearson coefficient, a correlation heatmap, and a basic anomaly detection, as well as counting distinct devices by name and country. These are just ideas, data quality was not performed in depth, just general possible analysis directions.

In [None]:
# Select numerical columns
selected_df = df.select("battery_level", "c02_level", "temp")

# Drop any rows with null values in these columns
selected_df = selected_df.na.drop()

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Assemble features into a vector
assembler = VectorAssembler(inputCols=["battery_level", "c02_level", "temp"], outputCol="features")
assembled_df = assembler.transform(selected_df)

# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

In [None]:
import pandas as pd
from scipy.cluster.hierarchy import dendrogram, linkage
import matplotlib.pyplot as plt

# Take a small sample to make the dendrogram plotting feasible
sample_data = scaled_df.select("scaled_features").limit(50).toPandas()
sample_features = pd.DataFrame(sample_data["scaled_features"].tolist())

# Perform hierarchical clustering using SciPy
Z = linkage(sample_features, method="ward")

# Plot the dendrogram
plt.figure(figsize=(10, 7))
dendrogram(Z, labels=sample_features.index.tolist())
plt.title("Hierarchical Clustering Dendrogram")
plt.xlabel("Device Index")
plt.ylabel("Distance")
plt.show()

In [None]:
# Calculate the Pearson correlation between temperature and CO2 level
temperature_c02_corr = df.stat.corr("temp", "c02_level")
print(f"The correlation between temperature and CO2 level is: {temperature_c02_corr}")

In [None]:
# List of column pairs to analyze
columns_to_compare = [("temp", "c02_level"), ("battery_level", "temp"), ("battery_level", "c02_level")]

# Calculate and display correlations for each pair
for col1, col2 in columns_to_compare:
    corr_value = df.stat.corr(col1, col2)
    print(f"The correlation between {col1} and {col2} is: {corr_value}")

In [None]:
import seaborn as sns

# Select numerical columns and convert to Pandas
numerical_data = df.select("battery_level", "temp", "c02_level", "humidity").toPandas()

# Calculate correlations
correlation_matrix = numerical_data.corr()

# Plot the heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", vmin=-1, vmax=1)
plt.title("Correlation Heatmap of IoT Features")
plt.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import OneVsRest
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline

# Prepare data for Isolation Forest or other anomaly detection
assembler = VectorAssembler(inputCols=["battery_level", "c02_level", "temp"], outputCol="features")
device_features = assembler.transform(df).select("features")

# Set up an Isolation Forest or use KMeans as a proxy for anomaly detection
isolation_forest = KMeans(k=2, seed=1, featuresCol="features")
model = isolation_forest.fit(device_features)

# Use the model to predict clusters
predictions = model.transform(device_features)
predictions.show()

In [None]:
from pyspark.sql.functions import when, col, count

# Step 1: Create a new DataFrame with a brand column
device_dist = df.withColumn(
    "brand",
    when(col("device_name").like("device-mac%"), "Mac")
    .when(col("device_name").like("device-samsung%"), "Samsung")
    .when(col("device_name").like("device-apple%"), "Apple")
    .when(col("device_name").like("device-dell%"), "Dell")
    .otherwise("Other")
)

# Step 2: Group by country and brand, then count devices
device_dist = device_dist.groupBy("cn", "brand").agg(count("device_name").alias("device_count"))

# Show the new DataFrame with device counts by country and brand
device_dist.show()