In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import StringType
import requests
import geohash2
spark = SparkSession.builder.appName("RestaurantWeather").getOrCreate()
restaurant_data_path = '/home/jovyan/restaurant_csv/*.csv'
weather_data_path = '/home/jovyan/weather/year=2016/month=10/day=01/*.parquet'
restaurants_df = spark.read.csv(restaurant_data_path, header=True, inferSchema=True)
weather_df = spark.read.parquet(weather_data_path)

In [10]:
missing_coordinates_df = restaurants_df.filter(col("lat").isNull() | col("lng").isNull())
missing_coordinates_df.show()

+-----------+------------+--------------+-----------------------+-------+------+----+----+
|         id|franchise_id|franchise_name|restaurant_franchise_id|country|  city| lat| lng|
+-----------+------------+--------------+-----------------------+-------+------+----+----+
|85899345920|           1|       Savoria|                  18952|     US|Dillon|NULL|NULL|
+-----------+------------+--------------+-----------------------+-------+------+----+----+



In [11]:
def get_coordinates(address):
    api_key = "c7d1742380aa409a9d7fd7de3fefe564"
    url = f"https://api.opencagedata.com/geocode/v1/json?q={address}&key={api_key}"
    response = requests.get(url)
    data = response.json()
    if data["results"]:
        lat = data["results"][0]["geometry"]["lat"]
        lng = data["results"][0]["geometry"]["lng"]
        return lat, lng
    return None, None
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
get_coordinates_udf = udf(get_coordinates, StringType())
restaurants_df = restaurants_df.withColumn("coordinates", get_coordinates_udf(col("city")))
restaurants_df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|         coordinates|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|[Ljava.lang.Objec...|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|[Ljava.lang.Objec...|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|[Ljava.lang.Objec...|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|[Ljava.lang.Objec...|
|163208757312|          65|       Chef's Corner|                  96638|    

In [14]:
import geohash2
def generate_geohash(latitude, longitude):
    if latitude and longitude:
        return geohash2.encode(latitude, longitude, precision=4) 
    return None

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
generate_geohash_udf = udf(generate_geohash, StringType())
restaurants_df = restaurants_df.withColumn("geohash", generate_geohash_udf(col("lat"), col("lng")))
restaurants_df.show()

+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+-------+
|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|         coordinates|geohash|
+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+-------+
|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|[Ljava.lang.Objec...|   dn4h|
| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|[Ljava.lang.Objec...|   u09t|
|214748364826|          27|     The Corner Cafe|                  92040|     US|    Rapid City| 44.08|-103.25|[Ljava.lang.Objec...|   9xyd|
|154618822706|          51|        The Pizzeria|                  41484|     AT|        Vienna|48.213| 16.413|[Ljava.lang.Objec...|   u2ed|
|163208757312|      

In [21]:
# Переименуем столбцы lat и lng в weather_df, чтобы избежать конфликта
weather_df = weather_df.withColumnRenamed("lat", "weather_lat") \
                       .withColumnRenamed("lng", "weather_lng")

# Объединение данных
enriched_df = restaurants_df.join(weather_df, on="geohash", how="left")

# Проверка результатов
enriched_df.show()


+-------+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+-----------+-----------+----------+----------+---------+
|geohash|          id|franchise_id|      franchise_name|restaurant_franchise_id|country|          city|   lat|    lng|         coordinates|weather_lng|weather_lat|avg_tmpr_f|avg_tmpr_c|wthr_date|
+-------+------------+------------+--------------------+-----------------------+-------+--------------+------+-------+--------------------+-----------+-----------+----------+----------+---------+
|   dn4h|197568495625|          10|    The Golden Spoon|                  24784|     US|       Decatur|34.578|-87.021|[Ljava.lang.Objec...|       NULL|       NULL|      NULL|      NULL|     NULL|
|   u09t| 17179869242|          59|         Azalea Cafe|                  10902|     FR|         Paris|48.861|  2.368|[Ljava.lang.Objec...|       NULL|       NULL|      NULL|      NULL|     NULL|
|   9xyd|21474836482

In [22]:
enriched_df.write.partitionBy("geohash").parquet("./enriched_data")

In [27]:
#TE
import unittest
class TestGeohash(unittest.TestCase):
    def test_generate_geohash(self):
        # Тестирование с использованием точности до 4 символов (или 6, если нужно)
        self.assertEqual(generate_geohash(40.7128, -74.0060), "dr5r")  # или "dr5ru7", в зависимости от точности
        self.assertIsNone(generate_geohash(None, None))  # Проверка на None

# Для запуска тестов из Jupyter, используйте следующий код:
if __name__ == "__main__":
    unittest.main(argv=['first-arg-is-ignored'], exit=False)


.
----------------------------------------------------------------------
Ran 1 test in 0.014s

OK
