In [11]:
%reset -f

In [12]:
import pandas as pd
import numpy as np
import json
pd.set_option('display.max_columns', None)
import re

import warnings
warnings.filterwarnings('ignore')

from tqdm import tqdm
tqdm.pandas()

from datetime import datetime

from pandarallel import pandarallel
pandarallel.initialize(progress_bar=False,nb_workers=2)

INFO: Pandarallel will run on 2 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


In [13]:
import google.auth
from google.oauth2 import service_account
from google.cloud import bigquery

credentials = service_account.Credentials.from_service_account_file('Cashbac-GCP-Keys.json')
project_id = 'cashbac-31433'

# Make clients.
cashbac_bqclient = bigquery.Client(
    credentials=credentials,
    project=project_id,
)

# IMPORT BQ NEW WAY

In [14]:
import MyFunc
# from MyFunc import cleaning_data_merchants,filtering_data_merchants,get_post_code,upload_file_to_GCS,create_table_from_gcs,is_on_radius,get_hot_area
from MyFunc import *

from importlib import reload
reload(MyFunc)
from MyFunc import *
# cleaning_data_merchants,filtering_data_merchants,get_post_code,upload_file_to_GCS,create_table_from_gcs,is_on_radius,get_hot_area

INFO: Pandarallel will run on 2 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


In [15]:
def split_indo_foreign_text(input_text):
    input_text=str(input_text)
    if len(input_text.split('(Asli)'))>1 :
        result=input_text.replace('(Diterjemahkan oleh Google)','').split('(Asli)')
        indo_text=result[0].strip()
        foreign_text=result[1].strip()
    elif len(input_text.split('(Diterjemahkan oleh Google)'))>1 :
        result=input_text.replace('(Asli) ','').split('(Diterjemahkan oleh Google)')
        indo_text=result[1].strip()
        foreign_text=result[0].strip()
    else:
        indo_text=input_text
        foreign_text=None
        
    return pd.Series([indo_text,foreign_text])

def cleaning_reviews(data):
    new_columns=[]
    for column in data.columns :
        column=column.lower()
        new_columns.append(column.replace(' ','_'))
        try:
            data[column]=data[column].str.replace(';','').str.replace('"','').str.replace('\n',' ').str.replace('\r','').str.replace('\t','')
        except:
            pass

    data.columns=new_columns
    data.review_text =data.review_text.astype(str)
    data.review_text = data.review_text.apply(lambda x : re.sub(r'[\x00-\x1f\x7f-\x9f]', '', x))
    data[['indo_text','foreign_text']]=data.review_text.parallel_apply(split_indo_foreign_text)
    new_columns=list(data.columns[:15])+['indo_text','foreign_text']+list(data.columns[15:-2])
    data=data[new_columns]
    return data

In [16]:
%%time

query_string=f""" 

SELECT column_name
FROM cashbac-31433.cashbac_datalake_prod.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'outcrapper_google_reviews_all'

"""

query_result = list(cashbac_bqclient.query(query_string).result(timeout=None))
print(len(query_result))
if len(query_result)>0:
    df_temp = pd.DataFrame(data=[list(x.values()) for x in query_result], columns=list(query_result[0].keys()))

1
CPU times: user 81.8 ms, sys: 14 ms, total: 95.7 ms
Wall time: 877 ms


In [17]:
%%time
import os
from google.cloud import bigquery

cashbac_key='Cashbac-GCP-Keys.json'
horego_key='Horego-GCP-Keys.json'

scrappe_date='20231206'

path = 'outcrapper_raw/reviews/'
files = os.listdir(path)
files = [f for f in files if os.path.isfile(path+'/'+f)]
files.sort()

# df_temp=pd.read_csv('results/clear_reviews_table.csv',sep=';', nrows=1)
# my_schema=[]
# for column in df_temp.columns :
#     my_schema.append(bigquery.SchemaField(column, "STRING"))

for file in files:
    if '.json' in file and int(file[:8])>=int(scrappe_date):
        print(file)
        f = open(f'{path}{file}')
        data = json.load(f)
        data = pd.DataFrame.from_dict(data)
        data = data[data.review_id!='__NO_REVIEWS_FOUND__']
        
        data['outcrapper_job_id']=file.split('_')[0]
        data['scrappe_date'] = datetime.strptime(file[:8], '%Y%m%d').strftime('%Y-%m-%d')
        data=cleaning_reviews(data)

        # df_temp=pd.read_csv('results/clear_reviews_table.csv',sep=';', nrows=1)
        my_schema=[]
        save_columns=[]
        for column in df_temp.column_name :
            if column in data.columns :
                save_columns.append(column)
                my_schema.append(bigquery.SchemaField(column, "STRING"))
            
        data[save_columns].to_csv('results/temp_upload_review.csv',sep=';',index=False)

        #############################################################################################
        #BQ DATALAKE HOREGO
        # src_path_file='results/temp_upload_review.csv'
        # target_path_file=f"scrapping_results/reviews/{file.replace('.json','.csv')}"
        # bucket_name='horego-bq'
        # upload_file_to_GCS(horego_key,bucket_name,src_path_file,target_path_file)

        # table_schema = "horego_datalake_dev"
        # table_name = "outcrapper_google_reviews_all"
        # gcs_path_uri = f"gs://{bucket_name}/scrapping_results/reviews/{file.replace('.json','.csv')}"
        # create_table_from_gcs(horego_key,gcs_path_uri,table_schema,table_name,my_schema,'append')
        
        #############################################################################################
        #############################################################################################
        #BQ DATALAKE CASHBAC
        bucket_name='cashbac_datalake'
        src_path_file='results/temp_upload_review.csv'
        target_path_file=f"scrapping_results/reviews/{file.replace('.json','.csv')}"
        upload_file_to_GCS(cashbac_key,bucket_name,src_path_file,target_path_file)

        table_schema = "cashbac_datalake_prod"
        table_name = "outcrapper_google_reviews_all"
        gcs_path_uri = f"gs://{bucket_name}/scrapping_results/reviews/{file.replace('.json','.csv')}"
        create_table_from_gcs(cashbac_key,gcs_path_uri,table_schema,table_name,my_schema,'append')
    
        #############################################################################################
 
del data
print('DONE')

202312071020163808_reviews_outlet_submission_7_des.json


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=202), Label(value='0 / 202'))), HB…

Uploading results/temp_upload_review.csv
Path in GCS: cashbac_datalake/scrapping_results/reviews/202312071020163808_reviews_outlet_submission_7_des.csv
table on : cashbac_datalake_prod.outcrapper_google_reviews_all
DONE
CPU times: user 212 ms, sys: 51.7 ms, total: 263 ms
Wall time: 3.36 s


In [18]:
date_format = '%Y%m%d'

query=f""" 

SELECT distinct google_id,place_id
FROM `cashbac-31433.cashbac_datalake_prod.vw_outcrapper_google_reviews_all` 
where scrappe_date >= '{datetime.strptime(scrappe_date, date_format).strftime('%Y-%m-%d')}'

"""

query_result = list(cashbac_bqclient.query(query).result(timeout=None))
print(len(query_result))
if len(query_result)>0:
    data = pd.DataFrame(data=[list(x.values()) for x in query_result], columns=list(query_result[0].keys()))

6522


In [19]:
%%time

query_string=f""" 

SELECT column_name
FROM cashbac-31433.cashbac_datalake_prod.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = 'list_insert_to_prod'

"""

query_result = list(cashbac_bqclient.query(query_string).result(timeout=None))
print(len(query_result))
df_temp = pd.DataFrame(data=[list(x.values()) for x in query_result], columns=list(query_result[0].keys()))

data['input_places_date'] = datetime.strptime(scrappe_date, '%Y%m%d').strftime('%Y-%m-%d') #time now
data['input_photo_date'] = datetime.strptime(scrappe_date, '%Y%m%d').strftime('%Y-%m-%d') #time now

my_schema=[]
save_columns=[]
for column in df_temp.column_name :
    if column in data.columns :
        save_columns.append(column)
        my_schema.append(bigquery.SchemaField(column, "STRING"))

data[save_columns].drop_duplicates().to_csv('results/input_list_insert_to_prod.csv',sep=';',index=False)

5
CPU times: user 44.7 ms, sys: 15.7 ms, total: 60.4 ms
Wall time: 628 ms


In [20]:
cashbac_key='Cashbac-GCP-Keys.json'
horego_key='Horego-GCP-Keys.json'

#############################################################################################

# df_temp=pd.read_csv('results/choosen_photos.csv',sep=';', nrows=1)
# my_schema=[]
# for column in df_temp.columns :
#     my_schema.append(bigquery.SchemaField(column, "STRING"))

src_path_file='results/input_list_insert_to_prod.csv'
target_path_file='scrapping_results/input_list_insert_to_prod.csv'
bucket_name='cashbac_datalake'
upload_file_to_GCS(cashbac_key,bucket_name,src_path_file,target_path_file)

table_schema = "cashbac_datalake_prod"
table_name = "list_insert_to_prod"
gcs_path_uri = f"gs://{bucket_name}/scrapping_results/input_list_insert_to_prod.csv"
create_table_from_gcs(cashbac_key,gcs_path_uri,table_schema,table_name,my_schema,'append')
print('DONE')

Uploading results/input_list_insert_to_prod.csv
Path in GCS: cashbac_datalake/scrapping_results/input_list_insert_to_prod.csv
table on : cashbac_datalake_prod.list_insert_to_prod
DONE
