## Data Transformation of Location Column

In [27]:
# Importing required modules
import os
import pandas as pd
import numpy as np
from geopy.geocoders import Nominatim
from geopy.point import Point
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import dask.dataframe as dd
from dotenv import load_dotenv

In [18]:
# Loading the method to access environment variables from .env file
# Create the .env file and then initialize variables that holds the following:
# Snowflake account name, Snowfalke username, Snowflake password, google account email
load_dotenv()

True

In [23]:
geolocator = Nominatim(user_agent="user25") #Name for session

In [20]:
# Configuration for connecting to snowflake database
engine = create_engine(URL(
                    account = os.getenv('snowflake_account_name'),
                    user = os.getenv('snowflake_user_name'),
                    password = os.getenv('snowflake_password'),
                    database = 'realestate',
                    schema = 'public',
                    warehouse = 'realestate_wh'))

In [30]:
# Retriving the Address based on the coordinates using geopy package and saving the results.
with engine.connect() as conn:
    try:
        query = """ SELECT RN, concat(latitude,',',longitude) as LOCATION
                    FROM (SELECT RN
                            , SUBSTR(location, REGEXP_INSTR(location,' ',1,4)+1) AS LATITUDE 
                            , SUBSTR(location, REGEXP_INSTR(location,' ',1,1)+1, (REGEXP_INSTR(location,' ',1,2) - REGEXP_INSTR(location,' ',1,1) - 1) ) AS LONGITUDE
                        FROM otodom_data_flatten -- WHERE rn between 1 and 100 [This adjacent statement is for part by part parsing, change it acccordingly]
                        ORDER BY rn  ) """ 
        
        df = pd.read_sql(query,conn)
                      
        df.columns = map(lambda x: str(x).upper(), df.columns)
        
        ddf = dd.from_pandas(df,npartitions=10)
        print(ddf.head(5,npartitions=-1))

        ddf['ADDRESS'] = ddf['LOCATION'].apply(lambda x: geolocator.reverse(x).raw['address'],meta=(None, 'str'))

        pandas_df = ddf.compute()
        print(pandas_df.head())
        
        pandas_df.to_csv('Otodom_Data_Transformation_Address.csv',index=False)
        
        # For directly inserting the transformed data in snowflake
        #pandas_df.to_sql('otodom_data_transformed_address', con=engine, if_exists='append', index=False, chunksize=16000, method=pd_writer)
    except Exception as e:
        print('Error',e)
    finally:
        conn.close()
engine.dispose()