In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Asia Towers Analysis") \
    .getOrCreate()


24/08/16 11:30:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
net_df = spark.read.csv("/user/dataproc/asia_towers.csv", header=True, inferSchema=True)
net_df.show()


                                                                                

+--------+-----+---+---+---+---------+----+---------------+---------------+-----+----+----------+----------+----------+-------------+---------+-------+---------+
|     _c0|radio|MCC|MNC|TAC|      CID|unit|            LON|            LAT|RANGE| SAM|changeable|   created|   updated|averageSignal|  Country|Network|Continent|
+--------+-----+---+---+---+---------+----+---------------+---------------+-----+----+----------+----------+----------+-------------+---------+-------+---------+
|12142742|  GSM|525|  1| 63|      452|   0|     103.827896|       1.431656| 1000|   3|         1|1459692344|1487379337|            0|Singapore|SingTel|     Asia|
|12142743| UMTS|525|  1|315| 20666852|   0|     103.625793|       1.309433| 1000|   2|         1|1370464837|1370464837|            0|Singapore|SingTel|     Asia|
|12142744|  GSM|525|  1| 63|     5143|   0|     103.838882|       1.425247| 1000|   3|         1|1380831294|1461549037|            0|Singapore|SingTel|     Asia|
|12142745|  GSM|525|  1| 63|

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import dayofmonth, month, year
from pyspark.sql.functions import count, when

In [4]:
from pyspark.sql.functions import col

net_df = net_df.withColumn("LON", col("LON").cast("float"))
net_df = net_df.withColumn("LAT", col("LAT").cast("float"))
net_df = net_df.withColumn("averageSignal", col("averageSignal").cast("float"))

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

net_df = net_df.withColumn("LON", col("LON").cast("float"))
net_df = net_df.withColumn("LAT", col("LAT").cast("float"))
net_df = net_df.withColumn("averageSignal", col("averageSignal").cast("float"))

# columns Longitude and Latitude
data = net_df.select('LON', 'LAT', 'averageSignal')
data = data.na.drop()

# Features
assembler = VectorAssembler(inputCols=['LON', 'LAT', 'averageSignal'], outputCol='features')
data = assembler.transform(data)

final_data = data.select('features', 'LON', 'LAT', 'averageSignal')
kmeans = KMeans(k=3, seed=1)  # 3 clusters
model = kmeans.fit(final_data)
predictions = model.transform(final_data)
predictions.show()


                                                                                

+--------------------+----------+---------+-------------+----------+
|            features|       LON|      LAT|averageSignal|prediction|
+--------------------+----------+---------+-------------+----------+
|[103.827896118164...|103.827896| 1.431656|          0.0|         0|
|[103.625793457031...| 103.62579| 1.309433|          0.0|         0|
|[103.838882446289...| 103.83888| 1.425247|          0.0|         0|
|[103.838775634765...|103.838776| 1.416006|          0.0|         0|
|[103.825607299804...| 103.82561|1.4261627|          0.0|         0|
|[103.744926452636...| 103.74493| 1.336899|          0.0|         0|
|[103.799514770507...|103.799515| 1.334702|          0.0|         0|
|[103.780204772949...|103.780205| 1.337385|          0.0|         0|
|[103.779724121093...|103.779724| 1.336818|          0.0|         0|
|[103.780769348144...| 103.78077| 1.336818|          0.0|         0|
|[103.787704467773...|103.787704| 1.334921|          0.0|         0|
|[103.793083190917...| 103.79308| 

In [6]:

from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)




Silhouette with squared euclidean distance = 0.728182380932545
Cluster Centers: 
[117.71041976  20.98159884   0.        ]
[53.90666525 31.81664544  0.        ]
[40.62338249 54.85841798  0.        ]


                                                                                

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
net_df = net_df.withColumn("LON", col("LON").cast("float"))
net_df = net_df.withColumn("LAT", col("LAT").cast("float"))
net_df = net_df.withColumn("averageSignal", col("averageSignal").cast("float"))


data = net_df.select('LON', 'LAT', 'averageSignal')
data = data.na.drop()
assembler = VectorAssembler(inputCols=['LON', 'LAT', 'averageSignal'], outputCol='features')
data = assembler.transform(data)
final_data = data.select('features', 'LON', 'LAT', 'averageSignal')

bisecting_kmeans = BisectingKMeans(k=3, seed=1)  # 3 clusters
model = bisecting_kmeans.fit(final_data)
predictions = model.transform(final_data)

predictions.show()

evaluator = ClusteringEvaluator()
silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette_score}")


                                                                                

+--------------------+----------+---------+-------------+----------+
|            features|       LON|      LAT|averageSignal|prediction|
+--------------------+----------+---------+-------------+----------+
|[103.827896118164...|103.827896| 1.431656|          0.0|         1|
|[103.625793457031...| 103.62579| 1.309433|          0.0|         1|
|[103.838882446289...| 103.83888| 1.425247|          0.0|         1|
|[103.838775634765...|103.838776| 1.416006|          0.0|         1|
|[103.825607299804...| 103.82561|1.4261627|          0.0|         1|
|[103.744926452636...| 103.74493| 1.336899|          0.0|         1|
|[103.799514770507...|103.799515| 1.334702|          0.0|         1|
|[103.780204772949...|103.780205| 1.337385|          0.0|         1|
|[103.779724121093...|103.779724| 1.336818|          0.0|         1|
|[103.780769348144...| 103.78077| 1.336818|          0.0|         1|
|[103.787704467773...|103.787704| 1.334921|          0.0|         1|
|[103.793083190917...| 103.79308| 



Silhouette Score: 0.778049013432422


                                                                                

In [8]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)





Silhouette with squared euclidean distance = 0.7780490134324222
Cluster Centers: 
[47.13002998 42.99370208  0.        ]
[105.85523584   8.28632877   0.        ]
[130.03335306  34.69222654   0.        ]


                                                                                

In [6]:
cluster_distribution = predictions.groupBy("prediction").count()
cluster_distribution.show()



+----------+-------+
|prediction|  count|
+----------+-------+
|         1|2170178|
|         2|1886018|
|         0|9319793|
+----------+-------+



                                                                                

In [11]:
from pyspark.sql.functions import col, round as spark_round
net_df = net_df.filter(net_df.LON.isNotNull() & net_df.LAT.isNotNull() & net_df.RANGE.isNotNull())


net_df = net_df.withColumn("rounded_LON", spark_round(net_df.LON, 1))
net_df = net_df.withColumn("rounded_LAT", spark_round(net_df.LAT, 1))

range_by_area = net_df.groupBy("rounded_LON", "rounded_LAT").avg("RANGE")
range_by_area.show()



+-----------+-----------+------------------+
|rounded_LON|rounded_LAT|        avg(RANGE)|
+-----------+-----------+------------------+
|       98.9|        3.6| 1272.960244648318|
|      107.7|       -7.3|            1000.0|
|       98.7|        1.7|            1000.0|
|      113.9|       -1.3|           1248.75|
|      116.9|       -1.0|2376.6666666666665|
|      118.8|       -8.4|            1000.0|
|       68.5|       43.5|1031.6797385620914|
|       75.9|       43.6|            1000.0|
|       69.4|       43.3|           4475.25|
|      113.0|       28.5|            1115.2|
|      117.5|       35.6|            1000.0|
|      106.8|       30.3|            1000.0|
|      105.1|       25.6|            1000.0|
|      119.2|       30.2|1018.6153846153846|
|      118.5|       35.1|            1000.0|
|       82.3|       44.5|            2762.0|
|      110.0|       32.3|            1000.0|
|       37.6|       56.4|1671.2953367875648|
|       38.1|       53.2|2731.0779220779223|
|       45

                                                                                

In [14]:
data_path = '/user/dataproc/asia_towers.csv'
rdd = sc.textFile(data_path)

header = rdd.first()

rdd = rdd.filter(lambda line: line != header).map(lambda line: line.split(','))

def map_radio_technology(line):
    return (line[1], 1)  # RadioTechnology

def reduce_count(a, b):
    return a + b

radio_tech_distribution = rdd.map(map_radio_technology).reduceByKey(reduce_count).collect()

def map_time_difference(line):
    try:
        created = float(line[11])  #  timestamp at index 11
        updated = float(line[12])  #  timestamp at index 12
        return (line[1], (updated - created, 1)) 
    except ValueError:
        return (line[1], (0, 0))

def reduce_time_differences(a, b):
    total_diff = a[0] + b[0]  
    count = a[1] + b[1]  
    return (total_diff, count)

time_diff_totals = rdd.map(map_time_difference).reduceByKey(reduce_time_differences)

average_time_diff = time_diff_totals.mapValues(lambda x: x[0] / x[1] if x[1] != 0 else 0).collect()

print("Radio Technology Distribution:")
for tech, count in radio_tech_distribution:
    print(f"{tech}: {count}")

print('------------------------------------------------------------------')

print("Average Time Difference (seconds) by Radio Technology:")
for tech, avg_diff in average_time_diff:
    print(f"{tech}: {avg_diff}")






Radio Technology Distribution:
GSM: 2541259
NR: 20743
UMTS: 5703346
CDMA: 5245
LTE: 5105396
------------------------------------------------------------------
Average Time Difference (seconds) by Radio Technology:
GSM: 1466390316.0788128
NR: 1701107005.4001832
UMTS: 1469507735.909973
CDMA: 1414208006.8806481
LTE: 1534000192.7756891


                                                                                

In [16]:
from pyspark.sql.functions import from_unixtime, to_date, month
net_df = net_df.withColumn("update_date", to_date(from_unixtime(col("updated"))))

update_patterns = net_df.groupBy(month("update_date").alias("month")).count().orderBy("month")
update_patterns.show()



+-----+-------+
|month|  count|
+-----+-------+
|    1| 900332|
|    2| 903108|
|    3|1133696|
|    4|1393691|
|    5|1648922|
|    6| 615985|
|    7| 710857|
|    8| 948461|
|    9|2587132|
|   10| 861681|
|   11| 803811|
|   12| 868313|
+-----+-------+



                                                                                