In [1]:
import os
memory = '8g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [None]:
# Install shapely to deal with geospatial data
!pip install shapely

In [None]:
!kaggle datasets download -d dhruvildave/ookla-internet-speed-dataset -f "2020-q2/2020-04-01_performance_mobile_tiles.parquet"
!unzip 2020-04-01_performance_mobile_tiles.parquet.zip
!rm 2020-04-01_performance_mobile_tiles.parquet.zip

In [2]:
# initlize pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("internet-analysis-clustering").getOrCreate()

Picked up JAVA_TOOL_OPTIONS:  -Xmx3435m
Picked up JAVA_TOOL_OPTIONS:  -Xmx3435m
22/05/14 11:21:24 WARN Utils: Your hostname, fawazalesay-pysparkairl-mlxxsdnyoem resolves to a loopback address: 127.0.0.1; using 10.0.5.2 instead (on interface ceth0)
22/05/14 11:21:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/14 11:21:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/14 11:21:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


[('spark.executor.id', 'driver'), ('spark.sql.warehouse.dir', 'file:/workspace/pyspark-airline-delay-classification/spark-warehouse'), ('spark.app.id', 'local-1652527285957'), ('spark.rdd.compress', 'True'), ('spark.app.startTime', '1652527285134'), ('spark.driver.memory', '8g'), ('spark.app.name', 'internet-analysis-clustering'), ('spark.serializer.objectStreamReset', '100'), ('spark.driver.port', '35211'), ('spark.master', 'local[*]'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.driver.host', '10.0.5.2'), ('spark.ui.showConsoleProgress', 'true')]


In [3]:
# Needed to make Jupyter work with Gitpod
import plotly.io as pio
pio.renderers.default = 'iframe_connected'

In [4]:
# Read the data into a dataframe and print the schema
df = spark.read.parquet("2020-04-01_performance_mobile_tiles.parquet")
df.printSchema()

# Print the first 5 rows of the dataframe
df.show(5)

root
 |-- quadkey: string (nullable = true)
 |-- tile: string (nullable = true)
 |-- avg_d_kbps: long (nullable = true)
 |-- avg_u_kbps: long (nullable = true)
 |-- avg_lat_ms: long (nullable = true)
 |-- tests: long (nullable = true)
 |-- devices: long (nullable = true)

+----------------+--------------------+----------+----------+----------+-----+-------+
|         quadkey|                tile|avg_d_kbps|avg_u_kbps|avg_lat_ms|tests|devices|
+----------------+--------------------+----------+----------+----------+-----+-------+
|1203022122320032|POLYGON((24.09301...|     28772|      3165|        34|    8|      1|
|0313113213321131|POLYGON((-1.49963...|     20782|     10180|        54|    2|      2|
|1221210331312333|POLYGON((30.88806...|     22690|     22416|       449|    6|      2|
|1200312211223323|POLYGON((18.00109...|     54493|      4635|        21|    2|      2|
|0302233220203221|POLYGON((-81.5130...|     90669|      6576|        21|    1|      1|
+----------------+-------------

In [5]:
from shapely import wkt
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf


@udf(returnType=DoubleType())
def longitude(polygon: str):
    return wkt.loads(polygon).centroid.x

@udf(returnType=DoubleType())
def latitude(polygon):
    return wkt.loads(polygon).centroid.y

In [6]:
# Adds two columns: longitude and latitude
df = df.withColumn("longitude", longitude(df.tile))
df = df.withColumn("latitude", latitude(df.tile))

df.show(5)
df.printSchema()

[Stage 2:>                                                          (0 + 1) / 1]

+----------------+--------------------+----------+----------+----------+-----+-------+------------------+------------------+
|         quadkey|                tile|avg_d_kbps|avg_u_kbps|avg_lat_ms|tests|devices|         longitude|          latitude|
+----------------+--------------------+----------+----------+----------+-----+-------+------------------+------------------+
|1203022122320032|POLYGON((24.09301...|     28772|      3165|        34|    8|      1| 24.09576416015625| 49.88224742799456|
|0313113213321131|POLYGON((-1.49963...|     20782|     10180|        54|    2|      2| -1.49688720703125|52.953602268373295|
|1221210331312333|POLYGON((30.88806...|     22690|     22416|       449|    6|      2| 30.89080810546875|29.919232776382895|
|1200312211223323|POLYGON((18.00109...|     54493|      4635|        21|    2|      2| 18.00384521484375|59.356996008027856|
|0302233220203221|POLYGON((-81.5130...|     90669|      6576|        21|    1|      1|-81.51031494140625|41.317012752730506|


                                                                                

In [7]:
# Remove the "quadkey" and "tile" columns
df = df.drop("tile")
df = df.drop("quadkey")
df.printSchema()

# Drop null values
df = df.dropna()

# We will use only half of the data.
df = df.sample(fraction=0.05)

root
 |-- avg_d_kbps: long (nullable = true)
 |-- avg_u_kbps: long (nullable = true)
 |-- avg_lat_ms: long (nullable = true)
 |-- tests: long (nullable = true)
 |-- devices: long (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)



# Pipeline

As usual, we generate the features vector using VectorAssembler

Then we apply MinMax Normalization using MinMaxScaler

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

assembler = VectorAssembler(inputCols=["avg_d_kbps", "avg_u_kbps", "tests", "devices", "latitude", "longitude"], outputCol="features")

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [9]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline

kmeans = KMeans(featuresCol="scaledFeatures").setK(3)

pipeline = Pipeline(stages=[assembler, scaler, kmeans])

model = pipeline.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate clustering by computing Silhouett e score
evaluator = ClusteringEvaluator()

print("Silhouette with squared euclidean distance = ", evaluator.evaluate(predictions))

22/05/14 11:22:26 ERROR Instrumentation: java.lang.IllegalArgumentException: scaledFeatures does not exist. Available: avg_d_kbps, avg_u_kbps, avg_lat_ms, tests, devices, longitude, latitude, features
	at org.apache.spark.sql.types.StructType.$anonfun$apply$1(StructType.scala:278)
	at scala.collection.immutable.HashMap$HashTrieMap.getOrElse0(HashMap.scala:596)
	at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:73)
	at org.apache.spark.sql.types.StructType.apply(StructType.scala:277)
	at org.apache.spark.ml.util.SchemaUtils$.checkColumnTypes(SchemaUtils.scala:59)
	at org.apache.spark.ml.util.SchemaUtils$.validateVectorCompatibleColumn(SchemaUtils.scala:205)
	at org.apache.spark.ml.clustering.KMeansParams.validateAndTransformSchema(KMeans.scala:98)
	at org.apache.spark.ml.clustering.KMeansParams.validateAndTransformSchema$(KMeans.scala:97)
	at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:272)
	at org.apache.spark.ml.clustering.KMeans.transform

IllegalArgumentException: scaledFeatures does not exist. Available: avg_d_kbps, avg_u_kbps, avg_lat_ms, tests, devices, longitude, latitude, features