In [None]:
import requests
import geohash
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, udf
from pyspark.sql.types import StringType
from dotenv import load_dotenv
import os


# Load environment variables
load_dotenv()

API_KEY = os.getenv('API_KEY')

# Spark Session
spark = SparkSession.builder.appName("spark_practical_task").getOrCreate()

# Get geocode
def find_location(address):
    url = "https://api.opencagedata.com/geocode/v1/json"
    params = {
        "key": API_KEY,
        "q": address,
        "pretty": 1,  # Optional, for readable output
    }
    response = requests.get(url, params=params).json()
    if response and response["results"]:
        lat = response["results"][0]["geometry"]["lat"]
        lon = response["results"][0]["geometry"]["lng"]
        return lat, lon
    return None, None

# Generate geohash
def generate_geohash(lat, lon):
    if lat is not None and lon is not None:
        return geohash.encode(lat, lon, precision=4)
    return None

geohash_udf = udf(generate_geohash, StringType())

# Load Restaurant Data
restaurants_df = spark.read.load('/Users/kaisarsarymsakov/tech_orda_hadoop/restaurant_csv', format="csv", sep=",", header="true")

# Check for null lat and lng
restaurants_df = restaurants_df.withColumn(
    "lat",
    when(col("lat").isNull(), lit(None)).otherwise(col("lat"))
).withColumn(
    "lng",
    when(col("lng").isNull(), lit(None)).otherwise(col("lng"))
)

# Get lat and lng for missing values
restaurants_with_lat_lon = restaurants_df.rdd.map(lambda row: (
    *row,
    *get_lat_lon(row['address']) if row['lat'] is None or row['lng'] is None else (row['lat'], row['lng'])
))

# Create a new DataFrame
restaurants_df = spark.createDataFrame(restaurants_with_lat_lon, schema=restaurants_df.schema)

# Generate geohash
restaurants_df = restaurants_df.withColumn(
    "geohash", geohash_udf(col("lat"), col("lng"))
)

# Load Weather Data
weather_df = spark.read.parquet('/Users/kaisarsarymsakov/tech_orda_hadoop/weather')

# Generate geohash
weather_df = weather_df.withColumn(
    "geohash", geohash_udf(col("lat"), col("lng"))
)

# Join
final_data_df = restaurants_df.join(weather_df, on="geohash", how="left")

# Save output file
final_data_df.write.mode("overwrite").partitionBy("geohash").parquet('/Users/kaisarsarymsakov/tech_orda_hadoop')

# Stop the Spark session
spark.stop()
