In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import when, col, countDistinct
from pyspark.sql.functions import posexplode
from pyspark.sql.functions import posexplode_outer
from pyspark.sql.functions import expr
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
import pandas as pd

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.38.1 
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::142401413602:role/service-role/AmazonSageMaker-ExecutionRole-20220518T150261
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 10618f8b-96fd-400e-b4fb-8647355d04a6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.38.1
--enable-glue-datacatalog true
Waiting for session 10618f8b-96fd-400e-b4fb-8647355d04a6 to get into ready status...
Session 10618f8b-96fd-400e-b4fb-8647355d04a6 has been created.



In [2]:
spark = SparkSession.builder.appName("example").getOrCreate()




In [3]:
df = spark.sql(
"""

select 
 cd_cpf
, id_pessoa
, ds_classif_behavior
, ds_seg_consumo
, ds_perfil_research
, ds_persona
from customer_analytics_zone.ca_analitico_basao_crm

"""
)





In [4]:
df.show(5)

+-----------+--------------------+-------------------+--------------------+--------------------+--------------------+
|     cd_cpf|           id_pessoa|ds_classif_behavior|      ds_seg_consumo|  ds_perfil_research|          ds_persona|
+-----------+--------------------+-------------------+--------------------+--------------------+--------------------+
|08649985548|05e078a50ee1edea9...|                 14|                null|       Multiplicador|26 - Homem, Até 2...|
|09633852919|f2459ceaeea8ee564...|                  9|LOJAS DE DEPARTAM...|Multiplicador / A...|17 - Mulher, 25 a...|
|70016377699|22de21058a44a5d7f...|               null|                null|                null|17 - Mulher, 25 a...|
|27744355873|0c5eb96017dd4ff4c...|                 17|                null|       Multiplicador|22 - Mulher, 41 a...|
|07321571459|7e7a4cf9a0a96d3ea...|                 16|                null|BP - Multiplicado...|0 - Mulher, Casad...|
+-----------+--------------------+-------------------+--

In [5]:
df.head(1)

[Row(cd_cpf='08649985548', id_pessoa='05e078a50ee1edea9f634b74451f27e4a8be0b336e05d32466830849dc051322', ds_classif_behavior='14', ds_seg_consumo=None, ds_perfil_research='Multiplicador', ds_persona='26 - Homem, Até 24 anos, Nordeste')]


In [6]:
df.count()

5504274


In [7]:
df_test = df.drop('cd_cpf').sample(0.001)




In [8]:
df_test.count()

5531


In [27]:
df_test.write.partitionBy("id_pessoa").json("s3a://data-workspace-will-prod/flavia-costa/teste_json_basico")




In [9]:
df_test.show()

+--------------------+-------------------+--------------------+--------------------+--------------------+
|           id_pessoa|ds_classif_behavior|      ds_seg_consumo|  ds_perfil_research|          ds_persona|
+--------------------+-------------------+--------------------+--------------------+--------------------+
|e0231ea489a3eb6d2...|                 16|                null|Multiplicador / B...|10 - Mulher, Solt...|
|1de3caab943261288...|                 17|    SUPERMERCADO - 3|BP - Multiplicado...|18 - Homem, 25 a ...|
|cd5c2ea3ba7a59dd5...|                 14|                null|BP - Multiplicado...|31 - Mulher, 41 a...|
|d0f097ec98d040eb5...|                 17|        SERVICOS - 3|       Multiplicador|15 - Mulher, 25 a...|
|af2adbce839045e09...|               null|                null|  Barrado no Crédito|28 - Mulher, Até ...|
|8f2360be5a7b07494...|                 17|LOJAS DE DEPARTAM...|       Multiplicador|16 - Homem, 25 a ...|
|e5aa0defeca53feec...|                 17|    

In [12]:
unpivotExpr = """
stack(4
, 'ds_classif_behavior', ds_classif_behavior
, 'ds_seg_consumo', ds_seg_consumo
, 'ds_perfil_research', ds_perfil_research
, 'ds_persona', ds_persona) 
as (Modelos,Score)
"""

df_formato = df_test.select("id_pessoa", expr(unpivotExpr)) \
.withColumnRenamed('Modelos', 'id').withColumnRenamed('Score', 'score')\
.withColumn('description', \
when(col("id") == "ds_classif_behavior", "Behavior").otherwise(\
when(col("id") == "ds_seg_consumo", "Segm.MCC").otherwise(\
when(col("id") == "ds_perfil_research", "Perfis").otherwise(\
when(col("id") == "ds_persona", "Persona").otherwise("")))))

df_formato.show()

+--------------------+-------------------+--------------------+-----------+
|           id_pessoa|                 id|               score|description|
+--------------------+-------------------+--------------------+-----------+
|e0231ea489a3eb6d2...|ds_classif_behavior|                  16|   Behavior|
|e0231ea489a3eb6d2...|     ds_seg_consumo|                null|   Segm.MCC|
|e0231ea489a3eb6d2...| ds_perfil_research|Multiplicador / B...|     Perfis|
|e0231ea489a3eb6d2...|         ds_persona|10 - Mulher, Solt...|    Persona|
|1de3caab943261288...|ds_classif_behavior|                  17|   Behavior|
|1de3caab943261288...|     ds_seg_consumo|    SUPERMERCADO - 3|   Segm.MCC|
|1de3caab943261288...| ds_perfil_research|BP - Multiplicado...|     Perfis|
|1de3caab943261288...|         ds_persona|18 - Homem, 25 a ...|    Persona|
|cd5c2ea3ba7a59dd5...|ds_classif_behavior|                  14|   Behavior|
|cd5c2ea3ba7a59dd5...|     ds_seg_consumo|                null|   Segm.MCC|
|cd5c2ea3ba7

In [13]:
pandas_df = df_formato.toPandas()




In [14]:
pandas_df.head()

                                           id_pessoa  ... description
0  e0231ea489a3eb6d238f83da4b4b1212bf818d982de0d5...  ...    Behavior
1  e0231ea489a3eb6d238f83da4b4b1212bf818d982de0d5...  ...    Segm.MCC
2  e0231ea489a3eb6d238f83da4b4b1212bf818d982de0d5...  ...      Perfis
3  e0231ea489a3eb6d238f83da4b4b1212bf818d982de0d5...  ...     Persona
4  1de3caab943261288c71bc5382087bf5edf466666544c1...  ...    Behavior

[5 rows x 4 columns]


In [39]:
from pyspark.sql import functions as F
from pyspark.sql import Window




In [34]:
grouped_df = df_formato.groupby('id_pessoa').agg(F.collect_list(F.struct('id', 'description', 'score')).alias('Models'))
result_df = grouped_df.withColumn('Models', F.to_json('Models'))

# O resultado estará em formato DataFrame do PySpark
#result_df.show(truncate=False)




In [47]:
result_df.show()

+--------------------+--------------------+
|           id_pessoa|              Models|
+--------------------+--------------------+
|007fd82b3dd902cb0...|[{"id":"ds_classi...|
|021f6dfad456c79a3...|[{"id":"ds_classi...|
|029d3b7141de27acf...|[{"id":"ds_classi...|
|04654f9ab58eaa250...|[{"id":"ds_classi...|
|04dca8b520614228e...|[{"id":"ds_classi...|
|058004c1c57e80781...|[{"id":"ds_classi...|
|05e18ac22cc2518b2...|[{"id":"ds_classi...|
|05fc3a111a4a51f41...|[{"id":"ds_classi...|
|064798d580babdedf...|[{"id":"ds_classi...|
|07e48dac7411bc0e5...|[{"id":"ds_classi...|
|093b19f39d6583f8e...|[{"id":"ds_classi...|
|094181bfb96d55219...|[{"id":"ds_classi...|
|0a737c163365ffd1d...|[{"id":"ds_classi...|
|0af91b2e78cd8c388...|[{"id":"ds_classi...|
|0c239c4f79d104543...|[{"id":"ds_classi...|
|0c55fd48271aa5645...|[{"id":"ds_classi...|
|0cb681904b894003f...|[{"id":"ds_classi...|
|0d4e0418a8aef595d...|[{"id":"ds_classi...|
|0e3322da598e442df...|[{"id":"ds_classi...|
|0e3755dd76488a6bc...|[{"id":"ds

In [46]:
rdd = result_df.toJSON()
result = rdd.collect()
print(result)

ERROR: output size exceeded 6291456 bytes


In [55]:
result_df2 = result_df.groupBy("id_pessoa").agg(F.collect_list(F.struct('id_pessoa', 'Models')).alias('Geral')).withColumn('Geral', F.to_json('Geral'))




In [56]:
result_df2.show()

+--------------------+--------------------+
|           id_pessoa|               Geral|
+--------------------+--------------------+
|007fd82b3dd902cb0...|[{"id_pessoa":"00...|
|021f6dfad456c79a3...|[{"id_pessoa":"02...|
|029d3b7141de27acf...|[{"id_pessoa":"02...|
|04654f9ab58eaa250...|[{"id_pessoa":"04...|
|04dca8b520614228e...|[{"id_pessoa":"04...|
|058004c1c57e80781...|[{"id_pessoa":"05...|
|05e18ac22cc2518b2...|[{"id_pessoa":"05...|
|05fc3a111a4a51f41...|[{"id_pessoa":"05...|
|064798d580babdedf...|[{"id_pessoa":"06...|
|07e48dac7411bc0e5...|[{"id_pessoa":"07...|
|093b19f39d6583f8e...|[{"id_pessoa":"09...|
|094181bfb96d55219...|[{"id_pessoa":"09...|
|0a737c163365ffd1d...|[{"id_pessoa":"0a...|
|0af91b2e78cd8c388...|[{"id_pessoa":"0a...|
|0c239c4f79d104543...|[{"id_pessoa":"0c...|
|0c55fd48271aa5645...|[{"id_pessoa":"0c...|
|0cb681904b894003f...|[{"id_pessoa":"0c...|
|0d4e0418a8aef595d...|[{"id_pessoa":"0d...|
|0e3322da598e442df...|[{"id_pessoa":"0e...|
|0e3755dd76488a6bc...|[{"id_pess

In [None]:
#result_df.write.partitionBy("id_pessoa").json("s3a://data-workspace-will-prod/flavia-costa/teste_json_v2")

Execution Interrupted. Attempting to cancel the statement (statement_id=32)
Statement 32 has been cancelled


In [57]:
result_df2.select("Geral").write.json("s3a://data-workspace-will-prod/flavia-costa/teste_json_v4")




In [43]:
result_df2.write.partitionBy("id_pessoa").json("s3a://data-workspace-will-prod/flavia-costa/teste_json_v3")

AnalysisException: Partition column `id_pessoa` not found in schema struct<Geral:string>


In [22]:
@pandas_udf(functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = df.groupby(['id_pessoa']).apply(lambda x: x[['id','description','score']].to_dict('records'))
    return result

ValueError: Invalid return type: returnType can not be None


In [49]:
j = (df.groupby(['id_pessoa'])\
.apply(g)\
.reset_index()\
.rename(columns={0:'Models'})\
.to_json(orient='records'))

NameError: name 'g' is not defined


In [82]:
#j = (df.groupby(['id_pessoa'])\
#.apply(lambda x: x[['id','description','score']].to_dict('records'))\
#.reset_index()\
#.rename(columns={0:'Models'})\
#.to_json(orient='records'))

ValueError: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.
Stopping session: 6c493b44-6982-4de3-aade-0a70d10e60f0


In [31]:
import json




In [51]:
print(json.dumps(result_df, indent=2, sort_keys=False)) # ou true para experienceid no final

TypeError: Object of type DataFrame is not JSON serializable


In [54]:
#SALVAR O DUMPS NO S3 E DEPOIS CARREGAR ABAIXO
json.dumps(json.loads(result_df2.write.json("s3a://data-workspace-will-prod/flavia-costa/teste_json_aux"), indent=2, sort_keys=False).write("s3a://data-workspace-will-prod/flavia-costa/teste_json_unif")


SyntaxError: unexpected EOF while parsing (<stdin>, line 1)


In [None]:
# Writing to sample.json
json_object = json.dumps(json.loads(j), indent=2, sort_keys=False)

with open("s3a://data-workspace-will-prod/flavia-costa/teste_json_unif/sample.json", "w") as outfile:
    outfile.write(json_object)

In [None]:
df_json = spark.read.json("s3a://data-workspace-will-prod/flavia-costa/teste_json_unif/sample.json")

In [None]:

df_json.write.partitionBy("id_pessoa").json("result")