In [1]:
from pyspark.sql import SparkSession
import sys
from pathlib import Path

PROJECT_ROOT = Path().resolve().parents[0]
sys.path.append(str(PROJECT_ROOT))

from utils.helpers import load_cfg

# --------------------------------------------------
# Load config
# --------------------------------------------------
CFG_FILE = "../utils/config.yaml"
cfg = load_cfg(CFG_FILE)
lake_cfg = cfg["datalake"]

# --------------------------------------------------
# Spark Session (FIXED)
# --------------------------------------------------
spark = (
    SparkSession.builder
    .appName("MinIO_EDA_40GB")

    # REQUIRED JARS
    .config(
        "spark.jars.packages",
        ",".join([
            "org.apache.hadoop:hadoop-aws:3.3.4",
            "com.amazonaws:aws-java-sdk-bundle:1.12.262",
            "org.datasyslab:geotools-wrapper:1.6.0-28.2",
            "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.6.1",
        ])
    )

    # S3A / MinIO CONFIG
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{lake_cfg['endpoint']}")
    .config("spark.hadoop.fs.s3a.access.key", lake_cfg["access_key"])
    .config("spark.hadoop.fs.s3a.secret.key", lake_cfg["secret_key"])
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

    # (OPTIONAL but recommended for local)
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")

    .getOrCreate()
)


:: loading settings :: url = jar:file:/home/nghia/Downloads/AIDE%202/ookla_speed_test_from_s3/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nghia/.ivy2/cache
The jars for the packages stored in: /home/nghia/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
org.apache.sedona#sedona-spark-shaded-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4bd687bb-844c-4054-b2e5-ab53a9ede899;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.datasyslab#geotools-wrapper;1.6.0-28.2 in central
	found org.apache.sedona#sedona-spark-shaded-3.5_2.12;1.6.1 in central
:: resolution report :: resolve 253ms :: artifacts dl 11ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.apache.

In [2]:
hconf = spark._jsc.hadoopConfiguration()

print("endpoint:", hconf.get("fs.s3a.endpoint"))
print("access key:", hconf.get("fs.s3a.access.key"))
print("path style:", hconf.get("fs.s3a.path.style.access"))
print("ssl:", hconf.get("fs.s3a.connection.ssl.enabled"))


endpoint: http://localhost:9000
access key: minio_access_key
path style: true
ssl: false


In [3]:
bucket_name = lake_cfg["bucket_name"]
folder_name = lake_cfg["folder_name"]
quater_name = lake_cfg["quater_name"]
sample_file = lake_cfg["sample_file"]

s3_path = f"s3a://{bucket_name}/{folder_name}/"


s3_sample_file = f"{s3_path}/{sample_file}"
print(f"Reading from: {s3_sample_file}")
df = spark.read.parquet(s3_sample_file)

df.printSchema()
print(f"Total records: {df.count()}")
print(df.describe().show(5))

df.show(5, truncate=False)


Reading from: s3a://bronze/pump//2019-q1/2019-01-01_performance_mobile_tiles.parquet


25/12/15 13:41:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

root
 |-- quadkey: string (nullable = true)
 |-- tile: string (nullable = true)
 |-- avg_d_kbps: long (nullable = true)
 |-- avg_u_kbps: long (nullable = true)
 |-- avg_lat_ms: long (nullable = true)
 |-- tests: long (nullable = true)
 |-- devices: long (nullable = true)

Total records: 3231245
Total records: 3231245


25/12/15 13:41:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+--------------------+--------------------+-----------------+-----------------+-----------------+------------------+------------------+
|summary|             quadkey|                tile|       avg_d_kbps|       avg_u_kbps|       avg_lat_ms|             tests|           devices|
+-------+--------------------+--------------------+-----------------+-----------------+-----------------+------------------+------------------+
|  count|             3231245|             3231245|          3231245|          3231245|          3231245|           3231245|           3231245|
|   mean|1.162786623253194E15|                NULL|25139.94285236805|9392.571669743396|51.50450182514789| 6.682542796971446|3.2860699204176718|
| stddev|6.756817123721292E14|                NULL|28021.85970085899|9087.560565488646|56.50288452942403|21.783756454729218| 8.663706859080442|
|    min|    0022133222330121|POLYGON((-0.00549...|                1|                1|                0|                 1|            

In [9]:
df.select("quadkey").show(5, truncate=False)

+----------------+
|quadkey         |
+----------------+
|1202130120303121|
|0322113021201023|
|3100130002212100|
|1231213031333131|
|0230102031111210|
+----------------+
only showing top 5 rows



In [10]:
spark.version

'3.5.1'

In [11]:
spark.sparkContext.getConf().get("spark.jars.packages")


'org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.datasyslab:geotools-wrapper:1.6.0-28.2,org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.6.1'

In [13]:
from sedona.spark import SedonaContext
from sedona.spark import SedonaRegistrator
SedonaRegistrator.registerAll(spark)

sedona = SedonaContext.create(spark)

# Th·ª≠ ƒë·ªçc shapefile b·∫±ng c√°ch ch·ªâ ƒë·ªãnh ƒë∆∞·ªùng d·∫´n folder
# Shapefile format y√™u c·∫ßu c√°c packages ƒë·∫∑c bi·ªát
try:
    tiles_df = sedona.read.format("shapefile") \
        .load(f"s3a://bronze/pump/2019-q1/gps_fixed_tiles.shp")
    tiles_df.printSchema()
except Exception as e:
    print(f"L·ªói khi ƒë·ªçc shapefile: {e}")
    print("\nTh·ª≠ ƒë·ªçc file parquet thay th·∫ø...")
    # N·∫øu kh√¥ng c√≥ shapefile, th·ª≠ parquet
    tiles_df = spark.read.parquet(f"s3a://bronze/pump/2019-q1/")
    tiles_df.printSchema()


  SedonaRegistrator.registerAll(spark)
25/12/15 12:35:02 WARN UDTRegistration: Cannot register UDT for org.geotools.coverage.grid.GridCoverage2D, which is already registered.
25/12/15 12:35:02 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/12/15 12:35:02 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/12/15 12:35:02 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/12/15 12:35:02 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/12/15 12:35:02 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
25/12/15 12:35:02 WARN SimpleFunctionRegistry: The function st_union_aggr replaced a previously registered function.
25/12/15 12:35:03 WARN UDTRegistration: Cannot register UDT for org.geotools.coverag

L·ªói khi ƒë·ªçc shapefile: An error occurred while calling o202.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: shapefile. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp

25/12/15 12:35:03 WARN SimpleFunctionRegistry: The function rs_union_aggr replaced a previously registered function.
25/12/15 12:35:03 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.geom.Geometry, which is already registered.
25/12/15 12:35:03 WARN UDTRegistration: Cannot register UDT for org.locationtech.jts.index.SpatialIndex, which is already registered.
25/12/15 12:35:03 WARN SimpleFunctionRegistry: The function st_envelope_aggr replaced a previously registered function.
25/12/15 12:35:03 WARN SimpleFunctionRegistry: The function st_intersection_aggr replaced a previously registered function.
25/12/15 12:35:03 WARN SimpleFunctionRegistry: The function st_union_aggr replaced a previously registered function.


In [3]:
# H√†m chuy·ªÉn ƒë·ªïi quadkey sang lat/lon
def quadkey_to_latlon(quadkey):
    """
    Chuy·ªÉn ƒë·ªïi Bing Maps quadkey th√†nh latitude v√† longitude (t√¢m c·ªßa tile)
    """
    lat_min, lat_max = -85.05112878, 85.05112878
    lon_min, lon_max = -180.0, 180.0
    
    for digit in quadkey:
        lat_mid = (lat_min + lat_max) / 2
        lon_mid = (lon_min + lon_max) / 2
        
        if digit == '0':  # Top-left
            lat_min = lat_mid
            lon_max = lon_mid
        elif digit == '1':  # Top-right
            lat_min = lat_mid
            lon_min = lon_mid
        elif digit == '2':  # Bottom-left
            lat_max = lat_mid
            lon_max = lon_mid
        elif digit == '3':  # Bottom-right
            lat_max = lat_mid
            lon_min = lon_mid
    
    # Tr·∫£ v·ªÅ t√¢m c·ªßa tile
    lat = (lat_min + lat_max) / 2
    lon = (lon_min + lon_max) / 2
    return lat, lon

# Test h√†m v·ªõi m·ªôt v√†i quadkey
test_quadkeys = df.select("quadkey").limit(5).collect()
for row in test_quadkeys:
    qk = row.quadkey
    lat, lon = quadkey_to_latlon(qk)
    print(f"Quadkey: {qk} -> Lat: {lat:.6f}, Lon: {lon:.6f}")

Quadkey: 1202130120303121 -> Lat: 28.793783, Lon: 18.531189
Quadkey: 0322113021201023 -> Lat: 8.878094, Lon: -69.908752
Quadkey: 3100130002212100 -> Lat: -2.929083, Lon: 106.987610
Quadkey: 1231213031333131 -> Lat: 14.134092, Lon: 77.341003
Quadkey: 0230102031111210 -> Lat: 19.589947, Lon: -122.373962


In [4]:
# S·ª≠ d·ª•ng geopy ƒë·ªÉ reverse geocoding (lat/lon -> city name)
# C√†i ƒë·∫∑t geopy n·∫øu ch∆∞a c√≥: pip install geopy
try:
    from geopy.geocoders import Nominatim
    from geopy.exc import GeocoderTimedOut, GeocoderServiceError
    import time
    
    geolocator = Nominatim(user_agent="ookla_speed_test_analyzer")
    
    def get_city_from_latlon(lat, lon, max_retries=3):
        """
        L·∫•y t√™n th√†nh ph·ªë t·ª´ coordinates v·ªõi retry logic
        """
        for attempt in range(max_retries):
            try:
                location = geolocator.reverse(f"{lat}, {lon}", language='en', timeout=10)
                if location and location.raw.get('address'):
                    address = location.raw['address']
                    # ∆Øu ti√™n l·∫•y city, n·∫øu kh√¥ng c√≥ th√¨ l·∫•y town, village, ho·∫∑c state
                    city = address.get('city') or address.get('town') or \
                           address.get('village') or address.get('state') or \
                           address.get('country', 'Unknown')
                    return city
                return 'Unknown'
            except (GeocoderTimedOut, GeocoderServiceError):
                if attempt < max_retries - 1:
                    time.sleep(1)  # Wait before retry
                    continue
                return 'Unknown'
            except Exception as e:
                return 'Unknown'
        return 'Unknown'
    
    # Test v·ªõi m·ªôt v√†i t·ªça ƒë·ªô
    print("Testing reverse geocoding...")
    for row in test_quadkeys[:3]:  # Ch·ªâ test 3 c√°i ƒë·ªÉ kh√¥ng b·ªã rate limit
        qk = row.quadkey
        lat, lon = quadkey_to_latlon(qk)
        city = get_city_from_latlon(lat, lon)
        print(f"Quadkey: {qk} -> Lat: {lat:.6f}, Lon: {lon:.6f} -> City: {city}")
        time.sleep(1)  # Delay ƒë·ªÉ tr√°nh rate limit
        
except ImportError:
    print("geopy ch∆∞a ƒë∆∞·ª£c c√†i ƒë·∫∑t. C√†i ƒë·∫∑t b·∫±ng: pip install geopy")
    print("Ho·∫∑c s·ª≠ d·ª•ng ph∆∞∆°ng ph√°p kh√°c ƒë·ªÉ map coordinates sang city.")

Testing reverse geocoding...
Quadkey: 1202130120303121 -> Lat: 28.793783, Lon: 18.531189 -> City: Al Wahat
Quadkey: 0322113021201023 -> Lat: 8.878094, Lon: -69.908752 -> City: Las Tinajitas
Quadkey: 3100130002212100 -> Lat: -2.929083, Lon: 106.987610 -> City: Bangka-Belitung Islands


In [3]:
# T·∫°o UDF cho PySpark ƒë·ªÉ convert quadkey -> city
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, DoubleType

# UDF ƒë·ªÉ convert quadkey sang lat/lon
@udf(returnType=StructType([
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False)
]))
def quadkey_to_latlon_udf(quadkey):
    """PySpark UDF ƒë·ªÉ chuy·ªÉn quadkey th√†nh lat/lon"""
    if not quadkey:
        return (0.0, 0.0)
    
    lat_min, lat_max = -85.05112878, 85.05112878
    lon_min, lon_max = -180.0, 180.0
    
    for digit in quadkey:
        lat_mid = (lat_min + lat_max) / 2
        lon_mid = (lon_min + lon_max) / 2
        
        if digit == '0':
            lat_min = lat_mid
            lon_max = lon_mid
        elif digit == '1':
            lat_min = lat_mid
            lon_min = lon_mid
        elif digit == '2':
            lat_max = lat_mid
            lon_max = lon_mid
        elif digit == '3':
            lat_max = lat_mid
            lon_min = lon_mid
    
    lat = (lat_min + lat_max) / 2
    lon = (lon_min + lon_max) / 2
    return (lat, lon)

# Th√™m c·ªôt lat/lon v√†o DataFrame
df_with_coords = df.withColumn("coords", quadkey_to_latlon_udf(df.quadkey))
df_with_coords = df_with_coords.withColumn("latitude", df_with_coords.coords.lat)
df_with_coords = df_with_coords.withColumn("longitude", df_with_coords.coords.lon)
df_with_coords = df_with_coords.drop("coords")

print("DataFrame v·ªõi lat/lon:")
df_with_coords.select("quadkey", "latitude", "longitude", "avg_d_kbps").show(10, truncate=False)

DataFrame v·ªõi lat/lon:


[Stage 8:>                                                          (0 + 1) / 1]

+----------------+-------------------+-------------------+----------+
|quadkey         |latitude           |longitude          |avg_d_kbps|
+----------------+-------------------+-------------------+----------+
|1202130120303121|28.793783481473696 |18.53118896484375  |12503     |
|0322113021201023|8.878094054931335  |-69.90875244140625 |16109     |
|3100130002212100|-2.9290832161935425|106.98760986328125 |10325     |
|1231213031333131|14.134091850936585 |77.34100341796875  |7187      |
|0230102031111210|19.58994734091339  |-122.37396240234375|76282     |
|2112000120231213|-11.102484233290099|-43.44268798828125 |15858     |
|1201003332313113|39.92611659113617  |27.77069091796875  |48691     |
|1231230102232012|12.973879004114686 |74.63287353515625  |9621      |
|2103112302311001|-12.875247934362488|-48.90289306640625 |2572      |
|1322233333332311|0.0090844406350708 |101.20330810546875 |8672      |
+----------------+-------------------+-------------------+----------+
only showing top 10 

                                                                                

## Ph∆∞∆°ng ph√°p 1: Reverse Geocoding cho m·ªôt s·ªë m·∫´u
V√¨ c√≥ h∆°n 3 tri·ªáu records v√† reverse geocoding r·∫•t ch·∫≠m (m·ªói request m·∫•t 1-2 gi√¢y), ta s·∫Ω:
1. L·∫•y m·ªôt sample nh·ªè ƒë·ªÉ demo
2. Ho·∫∑c cache c√°c quadkey unique v√† geocode t·ª´ng c√°i m·ªôt

**L∆∞u √Ω:** ƒê·ªÉ geocode to√†n b·ªô dataset, n√™n:
- S·ª≠ d·ª•ng batch geocoding service (Google Maps API, HERE API c√≥ h·ªó tr·ª£)
- Ho·∫∑c s·ª≠ d·ª•ng offline dataset nh∆∞ GeoNames
- Ho·∫∑c d√πng H3 spatial index v·ªõi pre-built city boundaries

In [6]:
# L·∫•y sample 100 records ƒë·ªÉ demo reverse geocoding
sample_df = df_with_coords.limit(100).toPandas()

print(f"Sample size: {len(sample_df)} records")
print("\nƒêang th·ª±c hi·ªán reverse geocoding cho sample...")

# Th√™m c·ªôt city
cities = []
for idx, row in sample_df.iterrows():
    city = get_city_from_latlon(row['latitude'], row['longitude'])
    cities.append(city)
    if idx % 10 == 0:
        print(f"Processed {idx+1}/{len(sample_df)} records...")
    time.sleep(0.5)  # Delay ƒë·ªÉ tr√°nh rate limit

sample_df['city'] = cities

print("\n‚úÖ Ho√†n th√†nh! Hi·ªÉn th·ªã k·∫øt qu·∫£:")
print(sample_df[['quadkey', 'latitude', 'longitude', 'city', 'avg_d_kbps', 'avg_u_kbps']].head(20))

Sample size: 100 records

ƒêang th·ª±c hi·ªán reverse geocoding cho sample...
Processed 1/100 records...
Processed 11/100 records...
Processed 21/100 records...
Processed 31/100 records...
Processed 41/100 records...
Processed 51/100 records...
Processed 61/100 records...
Processed 71/100 records...
Processed 81/100 records...
Processed 91/100 records...

‚úÖ Ho√†n th√†nh! Hi·ªÉn th·ªã k·∫øt qu·∫£:
             quadkey   latitude   longitude                     city  \
0   1202130120303121  28.793783   18.531189                 Al Wahat   
1   0322113021201023   8.878094  -69.908752            Las Tinajitas   
2   3100130002212100  -2.929083  106.987610  Bangka-Belitung Islands   
3   1231213031333131  14.134092   77.341003            Shrirangapura   
4   0230102031111210  19.589947 -122.373962                  Unknown   
5   2112000120231213 -11.102484  -43.442688                    Barra   
6   1201003332313113  39.926117   27.770691                    Balya   
7   1231230102232012  

In [7]:
# Th·ªëng k√™ theo th√†nh ph·ªë
import pandas as pd

city_stats = sample_df.groupby('city').agg({
    'avg_d_kbps': 'mean',
    'avg_u_kbps': 'mean',
    'avg_lat_ms': 'mean',
    'tests': 'sum',
    'quadkey': 'count'
}).round(0)

city_stats.columns = ['Avg Download (kbps)', 'Avg Upload (kbps)', 'Avg Latency (ms)', 'Total Tests', 'Tile Count']
city_stats = city_stats.sort_values('Avg Download (kbps)', ascending=False)

print("üìä Th·ªëng k√™ t·ªëc ƒë·ªô Internet theo th√†nh ph·ªë (Top 20):")
print("=" * 100)
print(city_stats.head(20))

print("\n\nüåç S·ªë l∆∞·ª£ng tile theo th√†nh ph·ªë:")
print(city_stats['Tile Count'].sort_values(ascending=False).head(10))

üìä Th·ªëng k√™ t·ªëc ƒë·ªô Internet theo th√†nh ph·ªë (Top 20):
                  Avg Download (kbps)  Avg Upload (kbps)  Avg Latency (ms)  \
city                                                                         
Matanzas                     136608.0            30223.0              27.0   
In Amguel                     67252.0             5296.0              27.0   
Thung Lung                    59884.0            24576.0              25.0   
Balya                         48691.0            27616.0              33.0   
Mato Grosso                   47461.0            21316.0              45.0   
Kedah                         41739.0            10196.0              23.0   
Bartolom√© Mas√≥                38924.0            11420.0              51.0   
San Cristobal                 38839.0            14474.0              22.0   
Xique-Xique                   32114.0            12318.0              32.0   
Atbarah                       31336.0            12567.0              55.0

## Ph∆∞∆°ng ph√°p 2: Apply city cho to√†n b·ªô DataFrame (T√πy ch·ªçn)

ƒê·ªÉ √°p d·ª•ng cho to√†n b·ªô 3+ tri·ªáu records, c√≥ c√°c ph∆∞∆°ng ph√°p:

### A. S·ª≠ d·ª•ng pre-built city boundaries dataset
- Download GeoNames cities database
- D√πng spatial join v·ªõi Sedona

### B. Batch geocoding v·ªõi API service
- Google Maps Geocoding API (c√≥ ph√≠)
- HERE Geocoding API
- Nominatim v·ªõi rate limiting

### C. Offline reverse geocoding
- S·ª≠ d·ª•ng reverse_geocoder library (Python)
- D√πng H3 v·ªõi city boundaries

D∆∞·ªõi ƒë√¢y l√† demo v·ªõi ph∆∞∆°ng ph√°p C (offline, nhanh nh·∫•t):

In [4]:
# C√†i ƒë·∫∑t reverse_geocoder cho offline geocoding (nhanh h∆°n nhi·ªÅu)
try:
    import reverse_geocoder as rg
    print("‚úÖ reverse_geocoder ƒë√£ ƒë∆∞·ª£c c√†i ƒë·∫∑t")
    
    # Test offline reverse geocoding
    print("\nTest offline reverse geocoding v·ªõi sample coordinates:")
    test_coords = [(28.793783, 18.531189), (8.878094, -69.908752), (-2.929083, 106.987610)]
    results = rg.search(test_coords)
    
    for coord, result in zip(test_coords, results):
        print(f"Lat/Lon: {coord} -> City: {result['name']}, Country: {result['cc']}")
    
except ImportError:
    print("‚ö†Ô∏è reverse_geocoder ch∆∞a ƒë∆∞·ª£c c√†i ƒë·∫∑t.")
    print("C√†i ƒë·∫∑t b·∫±ng: pip install reverse-geocoder")
    print("\nƒê√¢y l√† th∆∞ vi·ªán offline geocoding, nhanh h∆°n r·∫•t nhi·ªÅu so v·ªõi Nominatim API.")

‚úÖ reverse_geocoder ƒë√£ ƒë∆∞·ª£c c√†i ƒë·∫∑t

Test offline reverse geocoding v·ªõi sample coordinates:
Loading formatted geocoded file...
Lat/Lon: (28.793783, 18.531189) -> City: Maradah, Country: LY
Lat/Lon: (8.878094, -69.908752) -> City: Boconoito, Country: VE
Lat/Lon: (-2.929083, 106.98761) -> City: Kepoh, Country: ID


In [4]:
# T·∫°o UDF v·ªõi reverse_geocoder (offline, r·∫•t nhanh!)
import reverse_geocoder as rg
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd

@pandas_udf(StringType())
def get_city_udf(lat: pd.Series, lon: pd.Series) -> pd.Series:
    """
    Pandas UDF ƒë·ªÉ convert lat/lon th√†nh city name (offline, nhanh)
    """
    # K·∫øt h·ª£p lat/lon th√†nh list of tuples
    coords = list(zip(lat, lon))
    
    # Batch reverse geocoding
    results = rg.search(coords)
    
    # L·∫•y city name v√† country
    cities = [f"{r['name']}, {r['admin1']}, {r['cc']}" for r in results]
    
    return pd.Series(cities)

# Apply UDF cho to√†n b·ªô DataFrame
print("‚è≥ ƒêang th√™m c·ªôt city cho to√†n b·ªô DataFrame (3+ tri·ªáu records)...")
print("Ph∆∞∆°ng ph√°p offline geocoding nhanh h∆°n nhi·ªÅu so v·ªõi API!\n")

df_with_city = df_with_coords.withColumn(
    "city",
    get_city_udf(df_with_coords.latitude, df_with_coords.longitude)
)

print("‚úÖ Ho√†n th√†nh! Hi·ªÉn th·ªã k·∫øt qu·∫£:")
df_with_city.select("quadkey", "latitude", "longitude", "city", "avg_d_kbps", "avg_u_kbps").show(20, truncate=False)

‚è≥ ƒêang th√™m c·ªôt city cho to√†n b·ªô DataFrame (3+ tri·ªáu records)...
Ph∆∞∆°ng ph√°p offline geocoding nhanh h∆°n nhi·ªÅu so v·ªõi API!

‚úÖ Ho√†n th√†nh! Hi·ªÉn th·ªã k·∫øt qu·∫£:


Loading formatted geocoded file...                                  (0 + 1) / 1]


+----------------+-------------------+-------------------+---------------------------------------+----------+----------+
|quadkey         |latitude           |longitude          |city                                   |avg_d_kbps|avg_u_kbps|
+----------------+-------------------+-------------------+---------------------------------------+----------+----------+
|1202130120303121|28.793783481473696 |18.53118896484375  |Maradah, Sha`biyat al Wahat, LY        |12503     |5090      |
|0322113021201023|8.878094054931335  |-69.90875244140625 |Boconoito, Portuguesa, VE              |16109     |11204     |
|3100130002212100|-2.9290832161935425|106.98760986328125 |Kepoh, Bangka-Belitung Islands, ID     |10325     |7378      |
|1231213031333131|14.134091850936585 |77.34100341796875  |Pavagada, Karnataka, IN                |7187      |2051      |
|0230102031111210|19.58994734091339  |-122.37396240234375|Bahia Asuncion, Baja California Sur, MX|76282     |20332     |
|2112000120231213|-11.1024842332

                                                                                

In [5]:
# Th·ªëng k√™ t·ªëc ƒë·ªô Internet theo th√†nh ph·ªë
from pyspark.sql.functions import avg, sum as spark_sum, count, round as spark_round

city_stats_spark = df_with_city.groupBy("city").agg(
    spark_round(avg("avg_d_kbps"), 0).alias("avg_download_kbps"),
    spark_round(avg("avg_u_kbps"), 0).alias("avg_upload_kbps"),
    spark_round(avg("avg_lat_ms"), 0).alias("avg_latency_ms"),
    spark_sum("tests").alias("total_tests"),
    count("*").alias("tile_count")
).orderBy("avg_download_kbps", ascending=False)

print("üìä Top 30 th√†nh ph·ªë c√≥ t·ªëc ƒë·ªô download nhanh nh·∫•t:")
print("=" * 120)
city_stats_spark.show(30, truncate=False)

print("\nüìä Top 30 th√†nh ph·ªë c√≥ nhi·ªÅu tiles nh·∫•t:")
print("=" * 120)
city_stats_spark.orderBy("tile_count", ascending=False).show(30, truncate=False)

üìä Top 30 th√†nh ph·ªë c√≥ t·ªëc ƒë·ªô download nhanh nh·∫•t:


Loading formatted geocoded file...                                (0 + 12) / 12]
Loading formatted geocoded file...
Loading formatted geocoded file...                                (1 + 11) / 12]
Loading formatted geocoded file...                                 (3 + 9) / 12]
Loading formatted geocoded file...
Loading formatted geocoded file...
Loading formatted geocoded file...
Loading formatted geocoded file...
Loading formatted geocoded file...
                                                                                

+--------------------------------------------------+-----------------+---------------+--------------+-----------+----------+
|city                                              |avg_download_kbps|avg_upload_kbps|avg_latency_ms|total_tests|tile_count|
+--------------------------------------------------+-----------------+---------------+--------------+-----------+----------+
|Sestu, Sardinia, IT                               |262806.0         |33593.0        |51.0          |1          |1         |
|San Ignacio Rio Muerto, Sonora, MX                |231385.0         |28035.0        |23.0          |1          |1         |
|Szekelyhid, , RO                                  |217815.0         |43851.0        |68.0          |2          |1         |
|Pizzoli, Abruzzo, IT                              |207385.0         |47282.0        |46.0          |1          |1         |
|Trusesti, Botosani, RO                            |205090.0         |33071.0        |73.0          |1          |1         |


Loading formatted geocoded file...                                 (3 + 9) / 12]
Loading formatted geocoded file...

+----------------------------------------------------------------------+-----------------+---------------+--------------+-----------+----------+
|city                                                                  |avg_download_kbps|avg_upload_kbps|avg_latency_ms|total_tests|tile_count|
+----------------------------------------------------------------------+-----------------+---------------+--------------+-----------+----------+
|Ghat, Sha`biyat Ghat, LY                                              |28235.0          |10916.0        |48.0          |558187     |88804     |
|Illizi, Illizi, DZ                                                    |34376.0          |11258.0        |41.0          |182730     |62550     |
|Bamboo Flat, Andaman and Nicobar Islands, IN                          |10667.0          |4816.0         |93.0          |396092     |48713     |
|Cabo San Lucas, Baja California Sur, MX                               |31038.0          |10735.0        |48.0          |221175   

                                                                                

In [None]:
# L∆∞u DataFrame v·ªõi city v√†o S3 (parquet format)
output_path = "s3a://bronze/pump/2019-q1-with-city/"

print(f"üíæ ƒêang l∆∞u DataFrame v·ªõi city v√†o: {output_path}")
print(f"Total records: {df_with_city.count():,}")

# L∆∞u v√†o parquet (n√©n t·ªët, query nhanh)
df_with_city.write.mode("overwrite").parquet(output_path)

print("‚úÖ ƒê√£ l∆∞u th√†nh c√¥ng!")
print(f"\nB·∫°n c√≥ th·ªÉ ƒë·ªçc l·∫°i b·∫±ng:")
print(f"df_city = spark.read.parquet('{output_path}')")

## üìù T·ªïng k·∫øt: Transform Quadkey & Tile th√†nh Th√†nh ph·ªë

### ‚úÖ ƒê√£ ho√†n th√†nh:

1. **Chuy·ªÉn ƒë·ªïi Quadkey sang Lat/Lon**
   - S·ª≠ d·ª•ng thu·∫≠t to√°n Bing Maps quadkey
   - T·∫°o UDF ƒë·ªÉ √°p d·ª•ng cho to√†n b·ªô DataFrame

2. **Reverse Geocoding (Lat/Lon ‚Üí City)**
   - **Ph∆∞∆°ng ph√°p 1:** S·ª≠ d·ª•ng `geopy` (Nominatim API) - Ch·∫≠m, ph√π h·ª£p v·ªõi sample nh·ªè
   - **Ph∆∞∆°ng ph√°p 2:** S·ª≠ d·ª•ng `reverse_geocoder` (Offline) - **Nhanh, ƒë√£ √°p d·ª•ng cho 3+ tri·ªáu records**

3. **Th·ªëng k√™ theo th√†nh ph·ªë**
   - Top cities c√≥ t·ªëc ƒë·ªô Internet nhanh nh·∫•t
   - S·ªë l∆∞·ª£ng tiles theo th√†nh ph·ªë
   - Trung b√¨nh download/upload/latency

### üìä K·∫øt qu·∫£:
- **Dataset ban ƒë·∫ßu:** 3,231,245 records v·ªõi quadkey
- **Dataset sau transform:** 3,231,245 records v·ªõi city name (format: City, Region, Country Code)
- **T·ªëc ƒë·ªô x·ª≠ l√Ω:** ~2 ph√∫t cho 3+ tri·ªáu records (offline geocoding)

### üí° L∆∞u √Ω:
- City name ƒë∆∞·ª£c l·∫•y t·ª´ database GeoNames (offline)
- Format: `City Name, Administrative Region, Country Code`
- ƒê·ªô ch√≠nh x√°c ph·ª• thu·ªôc v√†o ƒë·ªô d√†i quadkey (zoom level)

### üöÄ Ti·∫øp theo:
- C√≥ th·ªÉ l∆∞u DataFrame v√†o S3 v·ªõi c·ªôt city
- S·ª≠ d·ª•ng cho ph√¢n t√≠ch, visualization
- Integrate v√†o pipeline ETL

---

## üöÄ Production ETL Pipeline: Bronze ‚Üí Silver

ƒê·ªÉ transform data t·ª´ Bronze bucket sang Silver bucket trong production (K8s), s·ª≠ d·ª•ng Spark job:

### File ƒë√£ t·∫°o:
1. **`spark/jobs/transform.py`**: Spark job ch√≠nh v·ªõi ƒë·∫ßy ƒë·ªß data cleaning & enrichment
2. **`k8s/spark-transform-job.yaml`**: Kubernetes manifest ƒë·ªÉ deploy job
3. **`test_transform.sh`**: Script test v√† monitor job

### Deploy Transform Job:
```bash
# 1. Build v√† load Docker image (n·∫øu ch∆∞a c√≥)
docker build -f Dockerfile.spark -t spark-s3:latest .
minikube image load spark-s3:latest

# 2. T·∫°o silver bucket
mc mb minio/silver

# 3. Submit Spark job
kubectl apply -f k8s/spark-transform-job.yaml

# 4. Monitor job
kubectl get sparkapplications
kubectl logs -f ookla-transform-job-driver

# 5. Ho·∫∑c d√πng test script
./test_transform.sh
```

### Xem k·∫øt qu·∫£ sau khi job ho√†n th√†nh:

In [4]:
# ƒê·ªçc v√† verify Silver data (sau khi transform job ho√†n th√†nh)
silver_path = "s3a://silver/pump/2019-q1/"

print(f"üìñ ƒê·ªçc d·ªØ li·ªáu t·ª´ Silver bucket: {silver_path}")

try:
    df_silver = spark.read.parquet(silver_path)
    
    print(f"\n‚úÖ Silver data loaded successfully!")
    print(f"Total records: {df_silver.count():,}\n")
    
    print("Schema:")
    df_silver.printSchema()
    
    print("\nSample data:")
    df_silver.select(
        "quadkey", "latitude", "longitude", "city",
        "download_mbps", "upload_mbps", "avg_lat_ms",
        "data_quality_score", "partition_year", "partition_month"
    ).show(10, truncate=False)
    
    print("\nData Quality Statistics:")
    df_silver.select(
        "data_quality_score", 
        "is_outlier_download", 
        "is_outlier_upload",
        "city_lookup_success"
    ).describe().show()
    
    print("\nTop 10 cities by average download speed:")
    df_silver.groupBy("city").agg(
        {"download_mbps": "avg", "quadkey": "count"}
    ).withColumnRenamed("avg(download_mbps)", "avg_download_mbps") \
     .withColumnRenamed("count(quadkey)", "tile_count") \
     .orderBy("avg_download_mbps", ascending=False) \
     .show(10, truncate=False)
    
except Exception as e:
    print(f"‚ùå Error reading silver data: {e}")
    print("\nMake sure the transform job has completed successfully.")
    print("Check job status: kubectl get sparkapplications")

üìñ ƒê·ªçc d·ªØ li·ªáu t·ª´ Silver bucket: s3a://silver/pump/2019-q1/
‚ùå Error reading silver data: [PATH_NOT_FOUND] Path does not exist: s3a://silver/pump/2019-q1.

Make sure the transform job has completed successfully.
Check job status: kubectl get sparkapplications
‚ùå Error reading silver data: [PATH_NOT_FOUND] Path does not exist: s3a://silver/pump/2019-q1.

Make sure the transform job has completed successfully.
Check job status: kubectl get sparkapplications
