In [7]:
conda install -n data_analytics_project  -c conda-forge snowflake-sqlalchemy

Collecting package metadata (current_repodata.json): ...working... done
Note: you may need to restart the kernel to use updated packages.

Solving environment: ...working... done

# All requested packages already installed.

Retrieving notices: ...working... done




  current version: 22.9.0
  latest version: 23.9.0

Please update conda by running

    $ conda update -n base -c defaults conda




In [8]:
!pip install snowflake-sqlalchemy




[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: python.exe -m pip install --upgrade pip




In [9]:
!pip install pandas




[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [12]:
!pip install snowflake-connector-python




[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: python.exe -m pip install --upgrade pip





Import Packages

In [13]:
import pandas as pd
import numpy as np
from geopy.geocoders import Nominatim
from geopy.point import Point
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import time 
import dask.dataframe as dd

# Google Maps Geocoding API
from geopy.geocoders import GoogleV3

NOTE: There is a limitation on how many times we call the geopy api in a day. We are working with only 1000 records so it is not a problem for us, however, if working with 500K or more records, run in batches multiple times taking 20K or 30K records.

NOTE: **Dask DataFrame** is a parallel and distributed computing library in Python that provides a way to work with larger-than-memory, out-of-core datasets. It is part of the Dask project, which aims to scale and parallelize various Python libraries and functions for high-performance computing

In [17]:
start_time = time.time()
# this is a geolocator that will take longitude and latitude as an input and return address as a dictionary
#geolocator = Nominatim(user_agent="otodomproject") # this is like a user agent

# since Nminatim has limited request access we will use Google Maps Geocoding API
# Define your Google Maps API key
api_key = 'YOUR_GOOGLE_API_KEY'

# Create a Google Maps geolocator instance
geolocator = GoogleV3(api_key=api_key)


# Connecting Python to Snowflake
engine = create_engine(URL(
    account='*************',
    user='amritaneogi',
    password='****************',
    database='HOUSE_PRICE',
    schema='PUBLIC',
    warehouse='PRICE_WH'))

# Establishing a connection and creating a variable that will store the connection
with engine.connect() as conn:
    try:
        # SQL query fetching the location data
        query = """
            SELECT ID, concat(latitude, ',', longitude) as LOCATION
            FROM (SELECT ID
                , SUBSTR(location, REGEXP_INSTR(location, ' ', 1, 4) + 1) AS LATITUDE
                , SUBSTR(location, REGEXP_INSTR(location, ' ', 1, 1) + 1, (REGEXP_INSTR(location, ' ', 1, 2) - REGEXP_INSTR(location, ' ', 1, 1) - 1)) AS LONGITUDE
            FROM otodom_data_flatten WHERE ID between 1 and 100
            ORDER BY ID
            ) """
        print("--- %s seconds ---" % (time.time() - start_time))

       # loading the result in a panda dataframe
        df = pd.read_sql(query,conn) # this dataframe has 2 columns ID and Location
        
        # making the column names into upper case
        # because later we use to_sql() which expects all columns to be in upper case 
        df.columns = map(lambda x: str(x).upper(), df.columns)

        # loading everything from pandas dataframe to dask datafame to improve the performance
        # dask dataframe is very similar to pandas datafame, except it can run multiple pandas dataframe parallely in a single instance
        ddf = dd.from_pandas(df,npartitions=10) # created 10 partitions
        print(ddf.head(5,npartitions=-1))

        # Adding a 3rd column Address
        # geolocator is an API with calls the function reverse()
        # geolocator.reverse() accepts latitude and longitude and return a dictionary with the adress. It's going to have city, country, pincode etc.
        # meta() will treat all the output from dask as a string
        ddf['ADDRESS'] = ddf['LOCATION'].apply(lambda x: geolocator.reverse(x), meta=(None, 'str'))
        # Extract relevant information and convert it to a string
        ddf['ADDRESS'] = ddf['ADDRESS'].astype(str)

        print("--- %s seconds ---" % (time.time() - start_time))

        # We have used dask only for the above operation, since it is very expensive and takes a lot of time 
        
        # moving the dask datafrmae to pandas dataframe
        pandas_df = ddf.compute()  #compute() will load the data
        print(pandas_df.head())
        print("--- %s seconds ---" % (time.time() - start_time))

        # Loading the data from the pandas dataframe to Snowflake
        pandas_df.to_sql('OTODOM_DATA_FLATTEN_ADDRESS', con=engine, if_exists='append', index=False)

    except Exception as e:
        print('--- Error --- ', e)
    finally:
        conn.close()
engine.dispose()

print("--- %s seconds ---" % (time.time() - start_time))


--- 1.9342389106750488 seconds ---
   ID                             LOCATION
0   1                    52.23614,21.00817
1   2                  52.336575,21.029306
2   3  51.10710682881388,16.94346882507325
3   4                    50.10361,20.00665
4   5                  52.336575,21.029306
--- 2.419481039047241 seconds ---
   ID                             LOCATION  \
0   1                    52.23614,21.00817   
1   2                  52.336575,21.029306   
2   3  51.10710682881388,16.94346882507325   
3   4                    50.10361,20.00665   
4   5                  52.336575,21.029306   

                                             ADDRESS  
0         Marszałkowska 138, 00-004 Warszawa, Poland  
1                  DW633 94, 03-044 Warszawa, Poland  
2              Graniczna 2aa, 54-516 Wrocław, Poland  
3  Osiedle Bohaterów Września 82P, 31-620 Kraków,...  
4                  DW633 94, 03-044 Warszawa, Poland  
--- 4.161579132080078 seconds ---




--- 10.388479471206665 seconds ---
