In [1]:
import os
import sys
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import col, regexp_replace, udf
from pyspark.sql.types import StringType
import googlemaps
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
spark = SparkSession.builder \
    .appName("GCSFilesRead") \
    .config("spark.jars", "/home/giuseppe/airbnb-project/gcs-connector-hadoop3-latest.jar") \
    .config('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem') \
    .config('fs.gs.auth.service.account.enable', 'true') \
    .config('fs.AbstractFileSystem.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS') \
    .config('google.cloud.auth.service.account.json.keyfile', '/home/giuseppe/airbnb-project/my-creds.json') \
    .getOrCreate()

24/04/09 13:57:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/09 13:57:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Leitura do arquivo CSV do GCS
gcs_path = 'gs://airbnb-zoomcamp-giuseppe-7/accurated/geolocation_accurated.csv'
df = spark.read.csv(gcs_path, inferSchema=True, header=True, sep=';')

In [5]:
df.count()

1000

In [6]:
from pyspark.sql.functions import lit

# Leitura do arquivo CSV do GCS
gcs_path = 'gs://airbnb-zoomcamp-giuseppe-7/geolocation.csv'
df = spark.read.csv(gcs_path, inferSchema=True, header=True, sep=';').limit(1000).repartition(1)  # Reduzindo para uma partição

# UDF permanece a mesma
def get_street_name(latitude, longitude):
    gmaps = googlemaps.Client(key=os.getenv('GOOGLE_MAPS_API_KEY'))
    reverse_geocode_result = gmaps.reverse_geocode((latitude, longitude))
    return reverse_geocode_result[0]['formatted_address'] if reverse_geocode_result else None

get_street_name_udf = udf(get_street_name, StringType())

# Transformação dos dados adaptada
def transform_data(df):
    df_geolocation = df.withColumn("latitude", regexp_replace(col("latitude"), ",", ".")) \
                       .withColumn("longitude", regexp_replace(col("longitude"), ",", "."))

    # Caching do DataFrame para evitar múltiplas ações de trigger
    df_geolocation.cache()

    df_geolocation = df_geolocation.withColumn("street", get_street_name_udf(col("latitude"), col("longitude")))
    df_geolocation = df_geolocation.withColumn('street', regexp_replace('street', r'^\d+\s+', ''))

    # Supondo que street_name é a coluna original e queremos removê-la
    df_geolocation = df_geolocation.drop("street_name")
    
    return df_geolocation

# Transformação e aplicação do processo
df_transformed = transform_data(df)

# Assegurando que df_transformed é avaliado uma única vez
df_transformed.show()


[Stage 15:>                                                         (0 + 1) / 1]

+-----------+-------------------+---------+-----------+--------------------+
| unified_id|              month| latitude|  longitude|              street|
+-----------+-------------------+---------+-----------+--------------------+
|AIR10052559|2022-12-01 00:00:00| 34.23948| -116.87991|Heavenly Valley R...|
|AIR10178668|2022-12-01 00:00:00| 34.25095| -116.94139|Point Rd, Big Bea...|
|AIR10211700|2022-12-01 00:00:00| 34.24516| -116.87618|Avalon Rd, Big Be...|
|AIR10344705|2022-12-01 00:00:00|  34.2675| -116.86089|Bear Mountain Rd,...|
|AIR10424683|2022-12-01 00:00:00| 34.25341| -116.88025|Starvation Flats ...|
|AIR10471190|2022-12-01 00:00:00| 34.23844| -116.82383|Spruce Ln, Sugarl...|
|AIR10551482|2022-12-01 00:00:00|  34.2355|  -116.8483|Klamath Rd, Big B...|
|AIR11209785|2022-12-01 00:00:00| 34.23943| -116.85551|Minton Ave, Big B...|
|AIR11458668|2022-12-01 00:00:00| 34.24263| -116.92486|Wanita Ln, Big Be...|
|AIR11529774|2022-12-01 00:00:00| 34.08019| -116.37654|Carmelita Way, Yu...|

                                                                                

In [7]:
import os
import glob

gcs_path = 'gs://airbnb-zoomcamp-giuseppe-7/geolocation.csv'
local_dir_path = 'transformed_geolocation'
local_file_final = 'transformed_geolocation.csv'

# Usar coalesce(1) para salvar o DataFrame como um único arquivo CSV em um diretório
df_transformed.coalesce(1).write.mode('overwrite').option('header', 'true').csv(local_dir_path)

# Encontra o arquivo dentro do diretório e o renomeia/move para a localização desejada
part_files = glob.glob(f"{local_dir_path}/part-*.csv")
if part_files:
    # Deve haver apenas um arquivo part-*.csv após usar coalesce(1)
    # O destino precisa incluir o caminho, não apenas o novo nome do arquivo
    os.rename(part_files[0], os.path.join(os.path.dirname(part_files[0]), local_file_final))

# Agora pode-se remover o diretório
try:
    # Remove arquivos residuais do diretório temporário primeiro
    for f in os.listdir(local_dir_path):
        if f != local_file_final:
            os.remove(os.path.join(local_dir_path, f))
    # Finalmente, se o diretório está vazio, removê-lo, ou deixá-lo se contém o arquivo renomeado
except OSError as e:
    print("Error: %s : %s" % (local_dir_path, e.strerror))


                                                                                

In [10]:
from google.cloud import storage

# Definir as variáveis necessárias
bucket_name = 'airbnb-zoomcamp-giuseppe-7'
source_file_name = 'transformed_geolocation/transformed_geolocation.csv'
destination_blob_name = 'accurated/geolocation_accurated.csv'

# Criar um cliente de armazenamento
storage_client = storage.Client()

# Obter o bucket
bucket = storage_client.bucket(bucket_name)

# Criar um novo blob e fazer o upload do arquivo para o GCS
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)

print(f"File {source_file_name} uploaded to {destination_blob_name}.")


File transformed_geolocation/transformed_geolocation.csv uploaded to accurated/geolocation_accurated.csv.
