In [2]:

import os
import gc
import sys
import pandas as pd
import ast
os.environ['PYSPARK_PYTHON'] = sys.executable 
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ["PYARROW_IGNORE_TIMEZONE"] = '1' 

# imports para trabajar con spark en local
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import IntegerType,StructField,StructType,StringType,LongType,MapType
from pyspark.sql import functions as F
import pyspark.pandas as ps

# imports para trabajar con GCP
from google.oauth2 import service_account
from google.cloud import translate_v2 as translate




# Credenciales para API's de GCP
SERVICE_ACCOUNT_FILE = '../../credentials/fiery-protocol-399500-f2566dd92ef4.json'
creds = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE)
translate_client = translate.Client(credentials=creds)


# Iniciamos sesión de spark.
SparkSession.stop(spk)
spk = SparkSession.builder.appName("PySpark Transformations to Populate our Data Warehouse").master("local[1]").getOrCreate()
ps.options.compute.ops_on_diff_frames = True
spk

In [14]:
psdf = ps.read_json('../../data/Google Maps/estados/review-Texas/11.json',index_col='gmap_id').reset_index()[['gmap_id', 'user_id', 'name', 'time', 'rating', 'text', 'resp']].spark.cache()
psdf.dtypes, psdf.count()

(gmap_id    object
 user_id    object
 name       object
 time        int64
 rating      int64
 text       object
 resp       object
 dtype: object,
 gmap_id    150000
 user_id    150000
 name       150000
 time       150000
 rating     150000
 text        88255
 resp        22406
 dtype: int64)

In [15]:
psdf.head(2)

Unnamed: 0,gmap_id,user_id,name,time,rating,text,resp
0,0x864c23ee6b5d9d09:0x3cc9cba7f179b2ee,102540505680898147322,Sergio Orjuela,1569461702556,5,,(We greatly appreciate you stopping by our sto...
1,0x864c23ee6b5d9d09:0x3cc9cba7f179b2ee,114879155270428006890,olivia,1620775162751,5,,"(Thank you for your support 🥰, 1622864960517)"


In [49]:
schema = StructType([
    StructField('user_id',StringType(),False),
    StructField('name',StringType(),True),
    StructField('time',LongType(),True),
    StructField('rating',IntegerType(),True),
    StructField('text',StringType(),True),
    StructField('resp',MapType(StringType(),StringType()),False),
    StructField('gmap_id',StringType(),False)
])


i = 1
df_list = []
while True:
    try:
        # Leemos los archivos en un SPARK Data Frame para poder acceder directamente a GCS
        sdf = spk.read.schema(schema).json(f'../../data/Google Maps/estados/review-California/{i}.json')[['gmap_id', 'user_id', 'name', 'time', 'rating', 'text', 'resp']].cache()
        sdf.selectExpr('cast(user_id as int) user_id').cache()
        sdf = sdf.withColumn('tiempo_respuesta',F.col('resp').getField('time')).cache()
        sdf = sdf.withColumn('tiempo_respuesta',F.col('tiempo_respuesta').cast(LongType())).cache()
        sdf = sdf.withColumn('respuesta',F.col('resp').getField('text')).sort('user_id').cache()
        sdf = sdf.withColumn('time',F.col('time').cast(LongType())).cache()
        # sdf = sdf[['index','gmap_id','user_id','name','time','rating','text','tiempo_respuesta','respuesta']].cache()
        # PANDAS API Data Frame: Paso intermedio para trabajar con los métodos de pandas pero con la potencia de spark, posteriormente guardaremos los datos en BQ después de 
        # las transformaciones...
        # sdf.count()
        psdf = sdf.pandas_api()
        sdf.unpersist()
        del sdf
        gc.collect()
        # psdf['estado'] = state
        df_list.append(psdf)
        i += 1
    except AnalysisException:
        break

psdf = ps.concat(df_list,axis=0)
del df_list
gc.collect()
psdf = psdf.reset_index()
psdf.spark.cache()
print(f'pyspark.pandas data frame persisted')

pyspark.pandas data frame persisted


In [31]:

sdf.show(truncate=True)

+------+--------------------+--------------------+--------------------+-------------+------+--------------------+----------------+--------------------+
| index|             gmap_id|             user_id|                name|         time|rating|                text|tiempo_respuesta|           respuesta|
+------+--------------------+--------------------+--------------------+-------------+------+--------------------+----------------+--------------------+
| 71227|0x80c2c5b46727768...|10000000910516696...|          Nidia Arce|1534331964232|     5|        Good serves!|            NULL|                NULL|
|106783|0x80dca0ab76284ae...|10000001110383985...|Johnathan Kirkcon...|1592880304004|     4|                NULL|            NULL|                NULL|
| 43781|0x808580e31c85111...|10000002480984360...|       Victor Medina|1493337633947|     5|Very nice establi...|   1493759986241|Thanks for the 5/...|
| 85594|0x80eacb93b18677b...|10000002997950820...|       Raychel Perez|1593401857411|   

In [32]:
sdf.printSchema()

root
 |-- index: long (nullable = false)
 |-- gmap_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- time: long (nullable = true)
 |-- rating: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tiempo_respuesta: long (nullable = true)
 |-- respuesta: string (nullable = true)



In [38]:
sdf.count()

2700000

In [41]:
psdf.count()

index      2700000
gmap_id    2700000
user_id    2700000
name       2700000
time       2700000
rating     2700000
text       1529036
resp        245169
dtype: int64

In [103]:
languages_psdf = ps.DataFrame(translate_client.get_languages()).sort_values('name')
languages_psdf.head(2)

Unnamed: 0,language,name
0,af,Afrikaans
1,sq,Albanian


In [104]:
psdf.resp.loc[66]

'{"time":1629166810138,"text":"现在接线员是很好的英语"}'

In [54]:
psdf.resp.loc[66][30:-2]

'现在接线员是很好的英语'

In [None]:
psdf['text'] = psdf.text.fillna('nan')
sdf = sdf.withColumn("resp", F.to_json("resp"))
respuestas = sdf.pandas_api()
respuestas = respuestas[['resp']]


In [185]:
psdf['text'] = psdf.text.fillna('nan')
sdf = sdf.withColumn("resp", F.to_json("resp"))
respuestas = sdf.pandas_api()
respuestas = respuestas[['resp']]
respuestas.spark.cache()
respuestas['resp'] = respuestas.text.fillna('nan')
respuestas = respuestas.resp != 'nan'
resp_idx = sorted(respuestas.index.tolist())
print(respuestas.head(2))
resp_idx[:10]

AttributeError: 'DataFrame' object has no attribute 'text'

In [77]:
psdf.resp.loc[66][:30] + f'{translation["translatedText"]}."' + '}'

'{"time":1629166810138,"text":"The operator now speaks very good English."}'

In [79]:
ast.literal_eval(psdf.resp.loc[66][:30] + f'{translation["translatedText"]}."' + '}')

{'time': 1629166810138, 'text': 'The operator now speaks very good English.'}

In [144]:
respuestas.loc[30,'resp']

'{"time":1631072651706,"text":"Thanks so much for your business and for taking the time to post a great review. Glad to hear our crew was so helpful and friendly and that we were able to get the family outfitted quickly and efficiently. Look forward to hearing from you in the future."}'

In [146]:
psdf['idioma_respuesta'] = ps.Series([],dtype='str')
respuestas.spark.cache()
for i in resp_idx:
    print(i)
    respuesta = respuestas.loc[i,'resp'][30:-2]
    language = translate_client.detect_language(respuesta)
    if language['language'] != 'en':
        translation = translate_client.translate(respuesta, target_language='en')
        psdf.loc[i,'idioma_respuesta'] = language['language']
        psdf.loc[i,'resp'] = ast.literal_eval(respuestas.loc[i,'resp'][:30] + f'{translation["translatedText"]}."' + '}')
    else:
        psdf.loc[i,'idioma_respuesta'] = 'en'
        psdf.loc[i,'resp'] = ast.literal_eval(respuestas.loc[i,'resp'])
    if i > 100:
        break

30


SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for '{time=1631072651706, text=Thanks so much for your business and for taking the time to post a great review. Glad to hear our crew was so helpful and friendly and that we were able to get the family outfitted quickly and efficiently. Look forward to hearing from you in the future.}' of class java.util.HashMap.

In [None]:
for i in range(len(respuestas)):
    respuestas['resp'].apply(lambda x: ast.literal_eval(x))
    respuestas.loc[i,'resp_time'] = respuestas.loc[i,'resp']['time']
    respuestas.loc[i,'resp_text'] = respuestas.loc[i,'resp']['text']
respuestas

In [None]:
PROJECT_ID = 'fiery-protocol-399500'
STATES = ['California'] #,'Texas'] # ,'New_York','Colorado','Georgia']
schema = StructType([
    StructField('user_id',StringType(),False),
    StructField('name',StringType(),True),
    StructField('time',LongType(),True),
    StructField('rating',IntegerType(),True),
    StructField('text',StringType(),True),
    StructField('resp',StringType(),False),
    StructField('gmap_id',StringType(),False)
])



psdfx = ps.DataFrame(columns=['gmap_id','user_id','name','time','text','rating','resp_time','resp_text'])
for state in STATES:
    i = 1
    df_list = []
    while True:
        try:
            # Leemos los archivos en un SPARK Data Frame para poder acceder directamente a GCS
            sdf = spk.read.schema(schema).json(f'../data/Google Maps/estados/review-{state}/{i}.json')[['user_id','name','time','rating','text','resp','gmap_id']].cache()
            sdf.selectExpr('cast(user_id as int) user_id')
            sdf.selectExpr('cast(null as map<string,string>) resp')
            # PANDAS API Data Frame: Paso intermedio para trabajar con los métodos de pandas pero con la potencia de spark, posteriormente guardaremos los datos en BQ después de 
            # las transformaciones...
            # sdf.count()
            psdf = sdf.pandas_api().spark.cache()
            sdf.unpersist()
            del sdf
            gc.collect()
            # psdf['time'] = ps.to_datetime(psdf['time'],unit='ms')
            psdf['estado'] = state
            psdf['resp'] = psdf.resp.fillna('nan')
            psdf['text'] = psdf.text.fillna('nan')
            df_list.append(psdf)
            i += 1
        except AnalysisException:
            break

    psdf = ps.concat(df_list,axis=0)
    psdf = psdf.reset_index(drop=True)
    del df_list
    psdf.spark.cache()
    print(f'pyspark.pandas data frame persisted - {state}')





#     # Generamos el primer grupo de transformaciones para los datos de las reviews de Maps en PANDAS API. Queda la metadata y los archivos de Yelp.
#     psdf['resp_time'] = ps.Series([],dtype='int64')
#     print('serie resp_time creada')
#     psdf['resp_text'] = ps.Series([],dtype='str')
#     psdf.spark.cache()
#     print('serie resp_text creada')
#     for i in range(len(psdf)):
#         print(i)
#         if type(psdf.loc[i,'resp']) == dict:
#             psdf.loc[i,'resp_time'] = psdf.loc[i,'resp']['time']
#             psdf.loc[i,'resp_text'] = psdf.loc[i,'resp']['text']
#         else:
#             pass
#     psdf.resp_time = psdf.resp_time.fillna(0).astype('int64')
#     psdf.resp_text = psdf.resp_text.fillna('')
#     psdf = psdf[['gmap_id','user_id','name','time','text','rating','resp_time','resp_text']]
#     psdf.spark.cache()
    
#     # Aquí concatenamos todos los archivos del estado en curso a los demás estados, para obtener una tabla total de estados.
#     psdfx = ps.concat(psdf,axis=0)
#     psdf.spark.unpersist()
#     del psdf
#     gc.collect()
#     print('pyspark.pandas data frame unpersisted and deleted')

# # Convertimos el dataframe de Pandas API on Spark a un dataframe de Spark
# sdf = psdfx.to_spark()
# del psdfx
# gc.collect()

# # Guardamos las tablas concatenadas en archivos .json en GCS.
# sdf.write.mode('overwrite').format('csv').save(f'../data/Google Maps/clean_test/estados/all_tables')