In [2]:
import pandas as pd
import numpy as np
from geopy.geocoders import Nominatim
from geopy.point import Point
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import time 
import dask.dataframe as dd

In [3]:
start_time = time.time()

### Connection between python and snowflake

In [10]:
engine = create_engine(URL(
                    account = 'ZZMZZDH-AR84667',
                    user = 'yifanj4',
                    password = 'Z970710z',
                    database = 'demo',
                    schema = 'public',
                    warehouse = 'demo_wh'))

### Geodata transformation
* Read data from snowflake
* Apply geolocator function transfer data from (LATITUDE,LONGITUDE) format to actual address
* update data as table to snowflake
PS: since geolocator has limit usage, do not repeat the process too many time to meet the error

In [11]:
geolocator = Nominatim(user_agent="otodomprojectanalysis")
with engine.connect() as conn:
    try:
        query = """ SELECT RN, concat(latitude,',',longitude) as LOCATION
                    FROM (SELECT RN
                            , SUBSTR(location, REGEXP_INSTR(location,' ',1,4)+1) AS LATITUDE 
                            , SUBSTR(location, REGEXP_INSTR(location,' ',1,1)+1, (REGEXP_INSTR(location,' ',1,2) - REGEXP_INSTR(location,' ',1,1) - 1) ) AS LONGITUDE
                        FROM otodom_data_flatten
                        ORDER BY rn  ) """
        print("--- %s seconds ---" % (time.time() - start_time))
        
        df = pd.read_sql(query,conn)
                      
        df.columns = map(lambda x: str(x).upper(), df.columns)
        
        ddf = dd.from_pandas(df,npartitions=10)
        print(ddf.head(5,npartitions=-1))

        ddf['ADDRESS'] = ddf['LOCATION'].apply(lambda x: geolocator.reverse(x).raw['address'],meta=(None, 'str'))
        print("--- %s seconds ---" % (time.time() - start_time))

        pandas_df = ddf.compute()
        print(pandas_df.head())
        print("--- %s seconds ---" % (time.time() - start_time))

        pandas_df.to_sql('otodom_data_flatten_address', con=engine, if_exists='append', index=False, chunksize=16000, method=pd_writer)
    except Exception as e:
        print('--- Error --- ',e)
    finally:
        conn.close()
engine.dispose()

--- 1188.5472559928894 seconds ---
   RN                             LOCATION
0   1                    52.23614,21.00817
1   2                  52.336575,21.029306
2   3  51.10710682881388,16.94346882507325
3   4                    50.10361,20.00665
4   5                  52.336575,21.029306
--- 1189.8629581928253 seconds ---
--- Error ---  Non-successful status code 429


### Text Translate
* Import gspread library, then apply google sheet API to connect python with googlesheet
* Apply google translate to translate text data 
* load data back to snowflake

In [7]:
import gspread
from gspread_dataframe import get_as_dataframe, set_with_dataframe

In [8]:
with engine.connect() as conn:
    try:
        query = """ SELECT RN, TITLE FROM otodom_data_flatten ORDER BY rn"""

        df = pd.read_sql(query,conn)

        gc = gspread.service_account()

        loop_counter = 0
        chunk_size = 1000
        file_name = 'OTODOM_ANALYSIS_'
        user_email = 'y********5@gmail.com'

        for i in range(0,len(df),chunk_size):
            loop_counter += 1
            df_in = df.iloc[i:(i+chunk_size), :]

            spreadsheet_title = file_name + str(loop_counter)
            try:
                locals()['sh'+str(loop_counter)] = gc.open(spreadsheet_title)
            except gspread.SpreadsheetNotFound:
                locals()['sh'+str(loop_counter)] = gc.create(spreadsheet_title)

            locals()['sh'+str(loop_counter)].share(user_email, perm_type='user', role='writer')
            wks = locals()['sh'+str(loop_counter)].get_worksheet(0)
            wks.resize(len(df_in)+1)
            set_with_dataframe(wks, df_in)   
                
            column = 'C'   # Column to apply the formula 
            start_row = 2  # Starting row to apply the formula
            end_row = wks.row_count   # Ending row to apply the formula
            cell_range = f'{column}{start_row}:{column}{end_row}' 
            curr_row = start_row
            cell_list = wks.range(cell_range)
            
            for cell in cell_list:
                cell.value = f'=GOOGLETRANSLATE(B{curr_row},"pl","en")'
                curr_row += 1
                
            # Update the worksheet with the modified cells
            wks.update_cells(cell_list, value_input_option='USER_ENTERED')

            print(f'Spreadsheet {spreadsheet_title} created!')

            df_log = pd.DataFrame({'ID':[loop_counter], 'SPREADSHEET_NAME':[spreadsheet_title]})
            df_log.to_sql('otodom_data_log', con=engine, if_exists='append', index=False, chunksize=16000, method=pd_writer)


            # df_out = get_as_dataframe(wks, usecols=[0,1,2,3], nrows=end_row, header=None, skiprows=1)
            # print(f'Spreadsheet {locals()["sh"+str(loop_counter)]} loaded back to DataFrame!')
            
            # df_out.columns = ['RN', 'TITLE', 'LOCATION', 'TITLE_ENG']

            # df_out.to_sql('otodom_data_transformed', con=engine, if_exists='append', index=False, chunksize=16000, method=pd_writer)

    except Exception as e:
        print('--- Error --- ',e)
    finally:
        conn.close()
engine.dispose()
print("--- %s seconds ---" % (time.time() - start_time))

Spreadsheet OTODOM_ANALYSIS_1 created!
Spreadsheet OTODOM_ANALYSIS_2 created!
Spreadsheet OTODOM_ANALYSIS_3 created!
Spreadsheet OTODOM_ANALYSIS_4 created!
Spreadsheet OTODOM_ANALYSIS_5 created!
Spreadsheet OTODOM_ANALYSIS_6 created!
Spreadsheet OTODOM_ANALYSIS_7 created!
Spreadsheet OTODOM_ANALYSIS_8 created!
Spreadsheet OTODOM_ANALYSIS_9 created!
Spreadsheet OTODOM_ANALYSIS_10 created!
Spreadsheet OTODOM_ANALYSIS_11 created!
Spreadsheet OTODOM_ANALYSIS_12 created!
Spreadsheet OTODOM_ANALYSIS_13 created!
Spreadsheet OTODOM_ANALYSIS_14 created!
Spreadsheet OTODOM_ANALYSIS_15 created!
Spreadsheet OTODOM_ANALYSIS_16 created!
Spreadsheet OTODOM_ANALYSIS_17 created!
Spreadsheet OTODOM_ANALYSIS_18 created!
Spreadsheet OTODOM_ANALYSIS_19 created!
Spreadsheet OTODOM_ANALYSIS_20 created!
Spreadsheet OTODOM_ANALYSIS_21 created!
Spreadsheet OTODOM_ANALYSIS_22 created!
Spreadsheet OTODOM_ANALYSIS_23 created!
Spreadsheet OTODOM_ANALYSIS_24 created!
Spreadsheet OTODOM_ANALYSIS_25 created!
Spreadshe

In [9]:
with engine.connect() as conn:
    try:
        query = """ SELECT ID, SPREADSHEET_NAME FROM otodom_data_log """
        df = pd.read_sql(query,conn)
        df.columns = map(lambda x: str(x).upper(), df.columns)

        gc = gspread.service_account()
        loop_counter = 0

        for index, row in df.iterrows():
            loop_counter += 1
            locals()['sh'+str(loop_counter)] = gc.open(row['SPREADSHEET_NAME'])
            wks = locals()['sh'+str(loop_counter)].get_worksheet(0)
            df_out = get_as_dataframe(wks, usecols=[0,1,2], nrows=wks.row_count, header=None, skiprows=1, evaluate_formulas=True)
            print('Spreadsheet '+row['SPREADSHEET_NAME']+' loaded back to DataFrame!')
            
            df_out.columns = ['RN', 'TITLE', 'TITLE_ENG']
            df_out.to_sql('otodom_data_flatten_translate', con=engine, if_exists='append', index=False, chunksize=16000, method=pd_writer)

    except Exception as e:
        print('--- Error --- ',e)
    finally:
        conn.close()
engine.dispose()

print("--- %s seconds ---" % (time.time() - start_time))

Spreadsheet OTODOM_ANALYSIS_1 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_2 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_3 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_1 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_2 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_3 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_1 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_1 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_2 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_3 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_4 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_5 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_6 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_7 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_8 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_9 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_10 loaded back to DataFrame!
Spreadsheet OTODOM_ANALYSIS_11 loaded back to D