In [67]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import Row
from pyspark.sql import Column as c


from minio.commonconfig import CopySource  
import requests
spark = SparkSession.builder.appName('load').enableHiveSupport().getOrCreate()

In [68]:
from minio import Minio
cliente = Minio("minio:9000",
access_key = "datalake",
secret_key = "datalake",
secure=False
)

In [69]:
# Lendo o arquivo de texto diretamente em um DataFrame  
df = spark.read.text("s3a://raw/api/TransSP/previsao_1c4b342d-cc23-4867-87d2-f657816219b4.json")

+--------------------+
|               value|
+--------------------+
|{".0.hr":"23:01",...|
+--------------------+



In [None]:
df_cleaned = df.withColumn("value", f.regexp_replace("value", "\"", "")) 

In [None]:
df_cleaned.show()

+--------------------+
|               value|
+--------------------+
|{.0.hr:23:01,.0.p...|
+--------------------+



In [72]:
df_rows = df_cleaned.select(f.explode(f.split("value", ",")).alias("value")) 

In [73]:
df_rows.show()

+--------------------+
|               value|
+--------------------+
|        {.0.hr:23:01|
|.0.ps.0.cp:720015611|
|         .0.ps.0.np:|
|.0.ps.0.py:-23.62...|
|.0.ps.0.px:-46.70...|
|.0.ps.0.vs.0.p:71250|
|.0.ps.0.vs.0.t:23:12|
| .0.ps.0.vs.0.a:true|
|.0.ps.0.vs.0.ta:2...|
|.0.ps.0.vs.0.py:-...|
|.0.ps.0.vs.0.px:-...|
|.0.ps.0.vs.0.sv:null|
|.0.ps.0.vs.0.is:null|
|.0.ps.0.vs.1.p:71173|
|.0.ps.0.vs.1.t:23:14|
| .0.ps.0.vs.1.a:true|
|.0.ps.0.vs.1.ta:2...|
|.0.ps.0.vs.1.py:-...|
|.0.ps.0.vs.1.px:-...|
|.0.ps.0.vs.1.sv:null|
+--------------------+
only showing top 20 rows



In [139]:
  
# Primeiro, adicione a coluna 'pos' que contém a posição do caractere ':'  
df_intermediate = df_rows.withColumn("pos", f.expr("instr(value, ':')"))  
  
# Agora, você pode utilizar a coluna 'pos' para extrair 'key' e 'value'  
df_columns = df_intermediate.withColumn("key", f.expr("substring(value, 1, pos - 1)")).withColumn("value", f.expr("substring(value, pos + 1)")).drop("pos")  # Remove a coluna 'pos' após seu uso 

In [140]:
df_columns.show()

+--------------------+---------------+
|               value|            key|
+--------------------+---------------+
|               23:01|         {.0.hr|
|           720015611|     .0.ps.0.cp|
|                    |     .0.ps.0.np|
|          -23.620215|     .0.ps.0.py|
|          -46.700154|     .0.ps.0.px|
|               71250| .0.ps.0.vs.0.p|
|               23:12| .0.ps.0.vs.0.t|
|                true| .0.ps.0.vs.0.a|
|2025-01-30T02:00:42Z|.0.ps.0.vs.0.ta|
|  -23.64686166666667|.0.ps.0.vs.0.py|
|  -46.74698333333334|.0.ps.0.vs.0.px|
|                null|.0.ps.0.vs.0.sv|
|                null|.0.ps.0.vs.0.is|
|               71173| .0.ps.0.vs.1.p|
|               23:14| .0.ps.0.vs.1.t|
|                true| .0.ps.0.vs.1.a|
|2025-01-30T02:00:57Z|.0.ps.0.vs.1.ta|
| -23.649099999999997|.0.ps.0.vs.1.py|
|           -46.75364|.0.ps.0.vs.1.px|
|                null|.0.ps.0.vs.1.sv|
+--------------------+---------------+
only showing top 20 rows



In [141]:
df_exploded = df_columns.withColumn("split_key", f.split(df_columns["key"], "\\.")).select(  
    "value",  
    "key",  
    # Agora, você deve ser capaz de acessar 'split_key' diretamente, pois está operando no DataFrame retornado por withColumn  
    f.col("split_key").getItem(1).alias("index_0"),  
    f.col("split_key").getItem(2).alias("key_0"),  
    f.col("split_key").getItem(3).alias("index_1"),  
    f.col("split_key").getItem(4).alias("key_1"), 
    f.col("split_key").getItem(5).alias("index_2"),
    f.col("split_key").getItem(6).alias("key_2") 
)

df_exploded.show()

+--------------------+---------------+-------+-----+-------+-----+-------+-----+
|               value|            key|index_0|key_0|index_1|key_1|index_2|key_2|
+--------------------+---------------+-------+-----+-------+-----+-------+-----+
|               23:01|         {.0.hr|      0|   hr|   null| null|   null| null|
|           720015611|     .0.ps.0.cp|      0|   ps|      0|   cp|   null| null|
|                    |     .0.ps.0.np|      0|   ps|      0|   np|   null| null|
|          -23.620215|     .0.ps.0.py|      0|   ps|      0|   py|   null| null|
|          -46.700154|     .0.ps.0.px|      0|   ps|      0|   px|   null| null|
|               71250| .0.ps.0.vs.0.p|      0|   ps|      0|   vs|      0|    p|
|               23:12| .0.ps.0.vs.0.t|      0|   ps|      0|   vs|      0|    t|
|                true| .0.ps.0.vs.0.a|      0|   ps|      0|   vs|      0|    a|
|2025-01-30T02:00:42Z|.0.ps.0.vs.0.ta|      0|   ps|      0|   vs|      0|   ta|
|  -23.64686166666667|.0.ps.

In [142]:
df_normalized = df_exploded.withColumn("key_concat", f.concat_ws("_", df_exploded["key_0"], df_exploded["key_1"], df_exploded["key_2"]))

df_normalized.show()

+--------------------+---------------+-------+-----+-------+-----+-------+-----+----------+
|               value|            key|index_0|key_0|index_1|key_1|index_2|key_2|key_concat|
+--------------------+---------------+-------+-----+-------+-----+-------+-----+----------+
|               23:01|         {.0.hr|      0|   hr|   null| null|   null| null|        hr|
|           720015611|     .0.ps.0.cp|      0|   ps|      0|   cp|   null| null|     ps_cp|
|                    |     .0.ps.0.np|      0|   ps|      0|   np|   null| null|     ps_np|
|          -23.620215|     .0.ps.0.py|      0|   ps|      0|   py|   null| null|     ps_py|
|          -46.700154|     .0.ps.0.px|      0|   ps|      0|   px|   null| null|     ps_px|
|               71250| .0.ps.0.vs.0.p|      0|   ps|      0|   vs|      0|    p|   ps_vs_p|
|               23:12| .0.ps.0.vs.0.t|      0|   ps|      0|   vs|      0|    t|   ps_vs_t|
|                true| .0.ps.0.vs.0.a|      0|   ps|      0|   vs|      0|    a|

In [143]:
df_index_0 = df_normalized.select("value", "index_0", "key_concat").where(c.isNull(df_normalized.index_1) & (df_normalized.value != '[]'))
df_index_1 = df_normalized.select("value", "index_0", "index_1", "key_concat").where(c.isNull(df_normalized.index_2) & ~ c.isNull(df_normalized.index_1))

In [144]:
df_index_2 = df_normalized.select("value", "index_0", "index_1", "index_2", "key_concat").where(~ c.isNull(df_normalized.index_2) & ~ c.isNull(df_normalized.index_1))

In [145]:
df_index_0.show()

+-----+-------+----------+
|value|index_0|key_concat|
+-----+-------+----------+
|23:01|      0|        hr|
|23:01|      1|        hr|
|23:01|      2|        hr|
|23:01|      3|        hr|
|23:01|      4|        hr|
|23:01|      5|        hr|
|23:01|      6|        hr|
|23:01|      7|        hr|
|23:01|      8|        hr|
|23:01|      9|        hr|
|23:01|     10|        hr|
|23:01|     11|        hr|
|23:01|     12|        hr|
|23:01|     13|        hr|
|23:01|     14|        hr|
|23:01|     15|        hr|
|23:01|     16|        hr|
|23:01|     17|        hr|
|23:01|     18|        hr|
|23:01|     19|        hr|
+-----+-------+----------+
only showing top 20 rows



In [146]:
df_1 = df_index_0.groupBy("index_0").pivot("key_concat").agg(f.first("value"))

In [147]:
df_1.show()

+-------+-----+
|index_0|   hr|
+-------+-----+
|      0|23:01|
|      1|23:01|
|     10|23:01|
|    100|23:01|
|   1000|23:04|
|   1001|23:04|
|   1002|23:04|
|   1003|23:04|
|    101|23:01|
|    102|23:01|
|    103|23:01|
|    104|23:01|
|    105|23:01|
|    106|23:01|
|    107|23:01|
|    108|23:01|
|    109|23:01|
|     11|23:01|
|    110|23:01|
|    111|23:01|
+-------+-----+
only showing top 20 rows



In [148]:
df_2 = df_index_1.groupBy("index_0", "index_1").pivot("key_concat").agg(f.first("value"))

In [149]:
df_2.show()

+-------+-------+---------+--------------------+----------+----------+-----+
|index_0|index_1|    ps_cp|               ps_np|     ps_px|     ps_py|ps_vs|
+-------+-------+---------+--------------------+----------+----------+-----+
|      0|      0|720015611|                    |-46.700154|-23.620215| null|
|      0|      1|340015493|                    |-46.692094|-23.592919| null|
|      0|     10|450011849|PARADA HOSPITAL C...|-46.748656|-23.648242| null|
|      0|     11|  3407076|Parada Marina Cin...|-46.672544|-23.577172| null|
|      0|     12|670016557|PARADA MUSEU JUDA...| -46.64583|-23.551241| null|
|      0|     13|190011831|PARADA NICOLINO B...|-46.757487|-23.651334|   []|
|      0|      2|440015158| ESTADOS UNIDOS B\/C|-46.661799|-23.570012| null|
|      0|      3| 70016561| GETULIO VARGAS B\/C|-46.653843|-23.558836| null|
|      0|      4|440015164|        GUIANAS B\/C|-46.667594|-23.574389| null|
|      0|      5|440015161|JOSE MARIA LISBOA...|-46.658743|-23.566834| null|

In [150]:
df_3 = df_index_2.groupBy("index_0", "index_1", "index_2").pivot("key_concat").agg(f.first("value"))
df_3.show()

+-------+-------+-------+-------+--------+-------+------------------+-------------------+--------+-------+--------------------+
|index_0|index_1|index_2|ps_vs_a|ps_vs_is|ps_vs_p|          ps_vs_px|           ps_vs_py|ps_vs_sv|ps_vs_t|            ps_vs_ta|
+-------+-------+-------+-------+--------+-------+------------------+-------------------+--------+-------+--------------------+
|      0|      0|      0|   true|    null|  71250|-46.74698333333334| -23.64686166666667|    null|  23:12|2025-01-30T02:00:42Z|
|      0|      0|      1|   true|    null|  71173|         -46.75364|-23.649099999999997|    null|  23:14|2025-01-30T02:00:57Z|
|      0|      1|      0|   true|    null|  71250|-46.74698333333334| -23.64686166666667|    null|  23:16|2025-01-30T02:00:42Z|
|      0|      1|      1|   true|    null|  71173|         -46.75364|-23.649099999999997|    null|  23:19|2025-01-30T02:00:57Z|
|      0|     10|      0|   true|    null|  71173|         -46.75364|-23.649099999999997|    null|  23:0

In [151]:
df_3_2 = df_3.join(df_2, ['index_1', 'index_0']).select(df_3.index_0, df_3.index_1, df_3.index_2, df_3.ps_vs_a, df_3.ps_vs_p, df_3.ps_vs_px, df_3.ps_vs_py, df_3.ps_vs_t, df_3.ps_vs_ta, df_2.ps_cp, df_2.ps_np, df_2.ps_px, df_2.ps_py) 

In [152]:
df_table = df_3_2.join(df_1, 'index_0').select(df_3_2.ps_vs_a, df_3_2.ps_vs_p, df_3_2.ps_vs_px, df_3_2.ps_vs_py, df_3_2.ps_vs_t, df_3_2.ps_vs_ta, df_3_2.ps_cp, df_3_2.ps_np, df_3_2.ps_px, df_3_2.ps_py, df_1.hr) 

In [153]:
df_table.show()

+-------+-------+------------------+-------------------+-------+--------------------+---------+--------------------+----------+----------+-----+
|ps_vs_a|ps_vs_p|          ps_vs_px|           ps_vs_py|ps_vs_t|            ps_vs_ta|    ps_cp|               ps_np|     ps_px|     ps_py|   hr|
+-------+-------+------------------+-------------------+-------+--------------------+---------+--------------------+----------+----------+-----+
|   true|  71250|-46.74698333333334| -23.64686166666667|  23:12|2025-01-30T02:00:42Z|720015611|                    |-46.700154|-23.620215|23:01|
|   true|  71173|         -46.75364|-23.649099999999997|  23:14|2025-01-30T02:00:57Z|720015611|                    |-46.700154|-23.620215|23:01|
|   true|  71250|-46.74698333333334| -23.64686166666667|  23:16|2025-01-30T02:00:42Z|340015493|                    |-46.692094|-23.592919|23:01|
|   true|  71173|         -46.75364|-23.649099999999997|  23:19|2025-01-30T02:00:57Z|340015493|                    |-46.692094|-23